Selaa lähdekoodia

Merge branch 'improve_pipeline_balance' of https://github.com/benjaminplee/ibrowse into merge_pull_req_123

merge_pull_req_123
Chandrashekhar Mullaparthi 9 vuotta sitten
vanhempi
commit
8494e9433f
13 muutettua tiedostoa jossa 376 lisäystä ja 102 poistoa
  1. +1
    -0
      .gitignore
  2. +3
    -0
      CONTRIBUTORS
  3. +4
    -2
      Makefile
  4. +5
    -0
      include/ibrowse.hrl
  5. BIN
      rebar
  6. +8
    -3
      src/ibrowse.erl
  7. +1
    -1
      src/ibrowse_http_client.erl
  8. +11
    -16
      src/ibrowse_lb.erl
  9. +1
    -0
      src/ibrowse_lib.erl
  10. +96
    -35
      src/ibrowse_socks5.erl
  11. +174
    -0
      test/ibrowse_functional_tests.erl
  12. +11
    -9
      test/ibrowse_test.erl
  13. +61
    -36
      test/ibrowse_test_server.erl

+ 1
- 0
.gitignore Näytä tiedosto

@ -9,3 +9,4 @@ doc/edoc-info
Emakefile
*.bat
.dialyzer_plt
.rebar

+ 3
- 0
CONTRIBUTORS Näytä tiedosto

@ -9,9 +9,12 @@ In alphabetical order:
Adam Kocoloski
Andrew Tunnell-Jones
Anthony Molinaro
Benjamin P Lee (https://github.com/benjaminplee)
Benoit Chesneau (https://github.com/benoitc)
Brian Richards (http://github.com/richbria)
Chris Newcombe
Dan Kelley
Dan Schwabe (https://github.com/dfschwabe)
Derek Upham
Eric Merritt
Erik Reitsma

+ 4
- 2
Makefile Näytä tiedosto

@ -15,9 +15,11 @@ install: compile
mkdir -p $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
cp -r ebin $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
test: all
eunit_test: all
./rebar eunit
erl -noshell -pa .eunit -pa test -s ibrowse -s ibrowse_test unit_tests \
test: all
erl -noshell -pa test -pa ebin -s ibrowse_test unit_tests \
-s ibrowse_test verify_chunked_streaming \
-s ibrowse_test test_chunked_streaming_once \
-s erlang halt

+ 5
- 0
include/ibrowse.hrl Näytä tiedosto

@ -18,4 +18,9 @@
-record(ibrowse_conf, {key, value}).
-define(CONNECTIONS_LOCAL_TABLE, ibrowse_lb).
-define(LOAD_BALANCER_NAMED_TABLE, ibrowse_lb).
-define(CONF_TABLE, ibrowse_conf).
-define(STREAM_TABLE, ibrowse_stream).
-endif.

BIN
rebar Näytä tiedosto


+ 8
- 3
src/ibrowse.erl Näytä tiedosto

@ -651,7 +651,7 @@ show_dest_status() ->
io:format("~80.80.=s~n", [""]),
Metrics = get_metrics(),
lists:foreach(
fun({Host, Port, Lb_pid, Tid, Size}) ->
fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
[Host ++ ":" ++ integer_to_list(Port),
integer_to_list(Tid),
@ -697,7 +697,7 @@ get_metrics() ->
fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
case get_metrics(X_host, X_port) of
{_, _, _, _, _} = X_res ->
[X_res | X_acc];
[{X_host, X_port, X_res} | X_acc];
_X_res ->
X_acc
end
@ -708,7 +708,12 @@ get_metrics(Host, Port) ->
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
{message_queue_len, Msg_q_len} ->
Msg_q_len;
_ ->
-1
end,
case Tid of
undefined ->
{Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};

+ 1
- 1
src/ibrowse_http_client.erl Näytä tiedosto

@ -2021,12 +2021,12 @@ dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
proc_state = Proc_state} = State) when Tid /= undefined,
Proc_state /= ?dead_proc_walking ->
Ts = os:timestamp(),
catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
(catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
[{'==', '$1', {const,self()}},
{'<', '$2', {const,Ts}}
],
[true]}])),
catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
State#state{cur_pipeline_size = Pipe_sz - 1};
dec_pipeline_counter(State) ->
State.

+ 11
- 16
src/ibrowse_lb.erl Näytä tiedosto

@ -124,17 +124,17 @@ handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_opt
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
Tid_size = ets:info(Tid, size),
case Tid_size > Max_sess of
case Tid_size >= Max_sess of
true ->
Reply = find_best_connection(Tid, Max_pipe, Tid_size),
Reply = find_best_connection(Tid, Max_pipe),
{reply, Reply, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
false ->
{ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
Ts = os:timestamp(),
ets:insert(Tid, {{0, Ts, Pid}, []}),
{reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}}
ets:insert(Tid, {{1, Ts, Pid}, []}),
{reply, {ok, {1, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}}
end;
handle_call(Request, _From, State) ->
@ -215,18 +215,13 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe, _Num_cur) ->
find_best_connection(Tid, Max_pipe) ->
case ets:first(Tid) of
{Spec_size, Ts, Pid} = First ->
case Spec_size >= Max_pipe of
true ->
{error, retry_later};
false ->
ets:delete(Tid, First),
ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
{ok, First}
end;
'$end_of_table' ->
{Spec_size, Ts, Pid} = First when Spec_size < Max_pipe ->
ets:delete(Tid, First),
ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
{ok, First};
_ ->
{error, retry_later}
end.

+ 1
- 0
src/ibrowse_lib.erl Näytä tiedosto

@ -363,6 +363,7 @@ parse_url([], get_password, Url, TmpAcc) ->
parse_url([], State, Url, TmpAcc) ->
{invalid_uri_2, State, Url, TmpAcc}.
default_port(socks5) -> 1080;
default_port(http) -> 80;
default_port(https) -> 443;
default_port(ftp) -> 21.

+ 96
- 35
src/ibrowse_socks5.erl Näytä tiedosto

@ -1,47 +1,108 @@
-module(ibrowse_socks5).
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-export([connect/3]).
-module(ibrowse_socks5).
-define(TIMEOUT, 2000).
-define(VERSION, 5).
-define(CONNECT, 1).
-define(SOCKS5, 5).
-define(AUTH_METHOD_NO, 0).
-define(AUTH_METHOD_USERPASS, 2).
-define(ADDRESS_TYPE_IP4, 1).
-define(COMMAND_TYPE_TCPIP_STREAM, 1).
-define(RESERVER, 0).
-define(STATUS_GRANTED, 0).
-define(NO_AUTH, 0).
-define(USERPASS, 2).
-define(UNACCEPTABLE, 16#FF).
-define(RESERVED, 0).
connect(Host, Port, Options) ->
Socks5Host = proplists:get_value(socks5_host, Options),
Socks5Port = proplists:get_value(socks5_port, Options),
-define(ATYP_IPV4, 1).
-define(ATYP_DOMAINNAME, 3).
-define(ATYP_IPV6, 4).
{ok, Socket} = gen_tcp:connect(Socks5Host, Socks5Port, [binary, {packet, 0}, {keepalive, true}, {active, false}]),
-define(SUCCEEDED, 0).
{ok, _Bin} =
case proplists:get_value(socks5_user, Options, undefined) of
undefined ->
ok = gen_tcp:send(Socket, <<?SOCKS5, 1, ?AUTH_METHOD_NO>>),
{ok, <<?SOCKS5, ?AUTH_METHOD_NO>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT);
_Else ->
Socks5User = list_to_binary(proplists:get_value(socks5_user, Options)),
Socks5Pass = list_to_binary(proplists:get_value(socks5_pass, Options)),
-export([connect/5]).
ok = gen_tcp:send(Socket, <<?SOCKS5, 1, ?AUTH_METHOD_USERPASS>>),
{ok, <<?SOCKS5, ?AUTH_METHOD_USERPASS>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT),
-import(ibrowse_lib, [get_value/2, get_value/3]).
UserLength = byte_size(Socks5User),
connect(Host, Port, Options, SockOptions, Timeout) ->
Socks5Host = get_value(socks5_host, Options),
Socks5Port = get_value(socks5_port, Options),
case gen_tcp:connect(Socks5Host, Socks5Port, SockOptions, Timeout) of
{ok, Socket} ->
case handshake(Socket, Options) of
ok ->
case connect(Host, Port, Socket) of
ok ->
{ok, Socket};
Else ->
gen_tcp:close(Socket),
Else
end;
Else ->
gen_tcp:close(Socket),
Else
end;
Else ->
Else
end.
ok = gen_tcp:send(Socket, << 1, UserLength >>),
ok = gen_tcp:send(Socket, Socks5User),
PassLength = byte_size(Socks5Pass),
ok = gen_tcp:send(Socket, << PassLength >>),
ok = gen_tcp:send(Socket, Socks5Pass),
{ok, <<1, 0>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT)
handshake(Socket, Options) when is_port(Socket) ->
{Handshake, Success} = case get_value(socks5_user, Options, <<>>) of
<<>> ->
{<<?VERSION, 1, ?NO_AUTH>>, ?NO_AUTH};
User ->
Password = get_value(socks5_password, Options, <<>>),
{<<?VERSION, 1, ?USERPASS, (byte_size(User)), User,
(byte_size(Password)), Password>>, ?USERPASS}
end,
ok = gen_tcp:send(Socket, Handshake),
case gen_tcp:recv(Socket, 0) of
{ok, <<?VERSION, Success>>} ->
ok;
{ok, <<?VERSION, ?UNACCEPTABLE>>} ->
{error, unacceptable};
{error, Reason} ->
{error, Reason}
end.
{ok, {IP1,IP2,IP3,IP4}} = inet:getaddr(Host, inet),
connect(Host, Port, Via) when is_list(Host) ->
connect(list_to_binary(Host), Port, Via);
connect(Host, Port, Via) when is_binary(Host), is_integer(Port),
is_port(Via) ->
{AddressType, Address} = case inet:parse_address(binary_to_list(Host)) of
{ok, {IP1, IP2, IP3, IP4}} ->
{?ATYP_IPV4, <<IP1,IP2,IP3,IP4>>};
{ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} ->
{?ATYP_IPV6, <<IP1,IP2,IP3,IP4,IP5,IP6,IP7,IP8>>};
_ ->
HostLength = byte_size(Host),
{?ATYP_DOMAINNAME, <<HostLength,Host/binary>>}
end,
ok = gen_tcp:send(Via,
<<?VERSION, ?CONNECT, ?RESERVED,
AddressType, Address/binary,
(Port):16>>),
case gen_tcp:recv(Via, 0) of
{ok, <<?VERSION, ?SUCCEEDED, ?RESERVED, _/binary>>} ->
ok;
{ok, <<?VERSION, Rep, ?RESERVED, _/binary>>} ->
{error, rep(Rep)};
{error, Reason} ->
{error, Reason}
end.
ok = gen_tcp:send(Socket, <<?SOCKS5, ?COMMAND_TYPE_TCPIP_STREAM, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16>>),
{ok, << ?SOCKS5, ?STATUS_GRANTED, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16 >>} = gen_tcp:recv(Socket, 10, ?TIMEOUT),
{ok, Socket}.
rep(0) -> succeeded;
rep(1) -> server_fail;
rep(2) -> disallowed_by_ruleset;
rep(3) -> network_unreachable;
rep(4) -> host_unreachable;
rep(5) -> connection_refused;
rep(6) -> ttl_expired;
rep(7) -> command_not_supported;
rep(8) -> address_type_not_supported.

+ 174
- 0
test/ibrowse_functional_tests.erl Näytä tiedosto

@ -0,0 +1,174 @@
%%% File : ibrowse_functional_tests.erl
%%% Authors : Benjamin Lee <http://github.com/benjaminplee>
%%% Dan Schwabe <http://github.com/dfschwabe>
%%% Brian Richards <http://github.com/richbria>
%%% Description : Functional tests of the ibrowse library using a live test HTTP server
%%% Created : 18 November 2014 by Benjamin Lee <yardspoon@gmail.com>
-module(ibrowse_functional_tests).
-include_lib("eunit/include/eunit.hrl").
-define(PER_TEST_TIMEOUT_SEC, 60).
-define(TIMEDTEST(Desc, Fun), {Desc, {timeout, ?PER_TEST_TIMEOUT_SEC, fun Fun/0}}).
-define(SERVER_PORT, 8181).
-define(BASE_URL, "http://localhost:" ++ integer_to_list(?SERVER_PORT)).
-define(SHORT_TIMEOUT_MS, 5000).
-define(LONG_TIMEOUT_MS, 30000).
-define(PAUSE_FOR_CONNECTIONS_MS, 2000).
-compile(export_all).
setup() ->
application:start(crypto),
application:start(public_key),
application:start(ssl),
ibrowse_test_server:start_server(?SERVER_PORT, tcp),
ibrowse:start(),
ok.
teardown(_) ->
ibrowse:stop(),
ibrowse_test_server:stop_server(?SERVER_PORT),
ok.
running_server_fixture_test_() ->
{foreach,
fun setup/0,
fun teardown/1,
[
?TIMEDTEST("Simple request can be honored", simple_request),
?TIMEDTEST("Slow server causes timeout", slow_server_timeout),
?TIMEDTEST("Pipeline depth goes down with responses", pipeline_depth),
?TIMEDTEST("Pipelines refill", pipeline_refill),
?TIMEDTEST("Timeout closes pipe", closing_pipes),
?TIMEDTEST("Requests are balanced over connections", balanced_connections),
?TIMEDTEST("Pipeline too small signals retries", small_pipeline),
?TIMEDTEST("Dest status can be gathered", status)
]
}.
simple_request() ->
?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [])).
slow_server_timeout() ->
?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)).
pipeline_depth() ->
MaxSessions = 2,
MaxPipeline = 2,
RequestsSent = 2,
EmptyPipelineDepth = 0,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, EmptyPipelineDepth), Counts).
pipeline_refill() ->
MaxSessions = 2,
MaxPipeline = 2,
RequestsToFill = MaxSessions * MaxPipeline,
%% Send off enough requests to fill sessions and pipelines in rappid succession
Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsToFill, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
% Verify that connections properly reported their completed responses and can still accept more
?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)),
% and do it again to make sure we really are clear
times(RequestsToFill, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
% Verify that connections properly reported their completed responses and can still accept more
?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)).
closing_pipes() ->
MaxSessions = 2,
MaxPipeline = 2,
RequestsSent = 2,
BalancedNumberOfRequestsPerConnection = 1,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts),
timer:sleep(?SHORT_TIMEOUT_MS),
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()).
balanced_connections() ->
MaxSessions = 4,
MaxPipeline = 100,
RequestsSent = 80,
BalancedNumberOfRequestsPerConnection = 20,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?LONG_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
small_pipeline() ->
MaxSessions = 10,
MaxPipeline = 10,
RequestsSent = 100,
FullRequestsPerConnection = 10,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line
ibrowse:show_dest_status("localhost", 8181),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, FullRequestsPerConnection), Counts),
Response = ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS),
?assertEqual({error, retry_later}, Response).
status() ->
MaxSessions = 10,
MaxPipeline = 10,
RequestsSent = 100,
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line
ibrowse:show_dest_status(),
ibrowse:show_dest_status("http://localhost:8181").
times(0, _) ->
ok;
times(X, Fun) ->
Fun(),
times(X - 1, Fun).

+ 11
- 9
test/ibrowse_test.erl Näytä tiedosto

@ -40,7 +40,7 @@
test_retry_of_requests/1
]).
-include("ibrowse.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
test_stream_once(Url, Method, Options) ->
test_stream_once(Url, Method, Options, 5000).
@ -257,17 +257,17 @@ dump_errors(Key, Iod) ->
] ++ ?LOCAL_TESTS).
local_unit_tests() ->
error_logger:tty(false),
unit_tests([], ?LOCAL_TESTS),
error_logger:tty(true).
unit_tests([], ?LOCAL_TESTS).
unit_tests() ->
unit_tests([], ?TEST_LIST).
error_logger:tty(false),
unit_tests([], ?TEST_LIST),
error_logger:tty(true).
unit_tests(Options, Test_list) ->
application:start(crypto),
application:start(public_key),
application:start(ssl),
application:ensure_all_started(ssl),
(catch ibrowse_test_server:start_server(8181, tcp)),
application:start(ibrowse),
Options_1 = Options ++ [{connect_timeout, 5000}],
@ -387,6 +387,8 @@ wait_for_resp(Pid) ->
{'EXIT', Reason};
{'DOWN', _, _, _, _} ->
wait_for_resp(Pid);
{'EXIT', _, normal} ->
wait_for_resp(Pid);
Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid)
@ -564,6 +566,7 @@ test_retry_of_requests() ->
test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay").
test_retry_of_requests(Url) ->
reset_ibrowse(),
Timeout_1 = 2050,
Res_1 = test_retry_of_requests(Url, Timeout_1),
case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
@ -586,7 +589,6 @@ test_retry_of_requests(Url) ->
_ ->
exit({failed, Timeout_1, Res_1})
end,
reset_ibrowse(),
Timeout_2 = 2200,
Res_2 = test_retry_of_requests(Url, Timeout_2),
case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
@ -604,10 +606,10 @@ test_retry_of_requests(Url) ->
true ->
ok;
false ->
exit({failed, Timeout_2, Res_2})
exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
end;
_ ->
exit({failed, Timeout_2, Res_2})
exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
end,
success.

+ 61
- 36
test/ibrowse_test_server.erl Näytä tiedosto

@ -1,49 +1,60 @@
%%% File : ibrowse_test_server.erl
%%% Author : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%% Benjamin Lee <http://github.com/benjaminplee>
%%% Dan Schwabe <http://github.com/dfschwabe>
%%% Brian Richards <http://github.com/richbria>
%%% Description : A server to simulate various test scenarios
%%% Created : 17 Oct 2010 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
-module(ibrowse_test_server).
-export([
start_server/2,
stop_server/1
stop_server/1,
get_conn_pipeline_depth/0
]).
-record(request, {method, uri, version, headers = [], body = []}).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
-define(ACCEPT_TIMEOUT_MS, 1000).
-define(CONN_PIPELINE_DEPTH, conn_pipeline_depth).
start_server(Port, Sock_type) ->
Fun = fun() ->
Proc_name = server_proc_name(Port),
case whereis(Proc_name) of
undefined ->
register(Proc_name, self()),
case do_listen(Sock_type, Port, [{active, false},
{reuseaddr, true},
{nodelay, true},
{packet, http}]) of
{ok, Sock} ->
do_trace("Server listening on port: ~p~n", [Port]),
accept_loop(Sock, Sock_type);
Err ->
erlang:error(
lists:flatten(
io_lib:format(
"Failed to start server on port ~p. ~p~n",
[Port, Err]))),
exit({listen_error, Err})
end;
_X ->
ok
end
end,
Proc_name = server_proc_name(Port),
case whereis(Proc_name) of
undefined ->
register(Proc_name, self()),
ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]),
case do_listen(Sock_type, Port, [{active, false},
{reuseaddr, true},
{nodelay, true},
{packet, http}]) of
{ok, Sock} ->
do_trace("Server listening on port: ~p~n", [Port]),
accept_loop(Sock, Sock_type);
Err ->
erlang:error(
lists:flatten(
io_lib:format(
"Failed to start server on port ~p. ~p~n",
[Port, Err]))),
exit({listen_error, Err})
end;
_X ->
ok
end
end,
spawn_link(Fun).
stop_server(Port) ->
server_proc_name(Port) ! stop,
timer:sleep(2000), % wait for server to receive msg and unregister
ok.
get_conn_pipeline_depth() ->
ets:tab2list(?CONN_PIPELINE_DEPTH).
server_proc_name(Port) ->
list_to_atom("ibrowse_test_server_"++integer_to_list(Port)).
@ -55,24 +66,36 @@ do_listen(ssl, Port, Opts) ->
ssl:listen(Port, Opts).
do_accept(tcp, Listen_sock) ->
gen_tcp:accept(Listen_sock);
gen_tcp:accept(Listen_sock, ?ACCEPT_TIMEOUT_MS);
do_accept(ssl, Listen_sock) ->
ssl:ssl_accept(Listen_sock).
ssl:ssl_accept(Listen_sock, ?ACCEPT_TIMEOUT_MS).
accept_loop(Sock, Sock_type) ->
case do_accept(Sock_type, Sock) of
{ok, Conn} ->
Pid = spawn_link(
fun() ->
server_loop(Conn, Sock_type, #request{})
end),
Pid = spawn_link(fun() -> connection(Conn, Sock_type) end),
set_controlling_process(Conn, Sock_type, Pid),
Pid ! {setopts, [{active, true}]},
accept_loop(Sock, Sock_type);
{error, timeout} ->
receive
stop ->
ok
after 10 ->
accept_loop(Sock, Sock_type)
end;
Err ->
Err
end.
connection(Conn, Sock_type) ->
catch ets:insert(?CONN_PIPELINE_DEPTH, {self(), 0}),
try
server_loop(Conn, Sock_type, #request{})
after
catch ets:delete(?CONN_PIPELINE_DEPTH, self())
end.
set_controlling_process(Sock, tcp, Pid) ->
gen_tcp:controlling_process(Sock, Pid);
set_controlling_process(Sock, ssl, Pid) ->
@ -86,6 +109,7 @@ setopts(Sock, ssl, Opts) ->
server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
receive
{http, Sock, {http_request, HttpMethod, HttpUri, HttpVersion}} ->
catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1),
server_loop(Sock, Sock_type, Req#request{method = HttpMethod,
uri = HttpUri,
version = HttpVersion});
@ -95,9 +119,12 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
case process_request(Sock, Sock_type, Req) of
close_connection ->
gen_tcp:shutdown(Sock, read_write);
not_done ->
ok;
_ ->
server_loop(Sock, Sock_type, #request{})
end;
catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1)
end,
server_loop(Sock, Sock_type, #request{});
{http, Sock, {http_error, Err}} ->
io:format("Error parsing HTTP request:~n"
"Req so far : ~p~n"
@ -109,8 +136,6 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
{tcp_closed, Sock} ->
do_trace("Client closed connection~n", []),
ok;
stop ->
ok;
Other ->
io:format("Recvd unknown msg: ~p~n", [Other]),
exit({unknown_msg, Other})
@ -163,7 +188,6 @@ process_request(Sock, Sock_type,
uri = {abs_path, "/ibrowse_head_transfer_enc"}}) ->
Resp = <<"HTTP/1.1 400 Bad Request\r\nServer: Apache-Coyote/1.1\r\nContent-Length:5\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\n\r\nabcde">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type,
#request{method='GET',
headers = Headers,
@ -215,6 +239,8 @@ process_request(Sock, Sock_type,
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp),
close_connection;
process_request(_Sock, _Sock_type, #request{uri = {abs_path, "/never_respond"} } ) ->
not_done;
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
@ -226,7 +252,6 @@ do_send(Sock, tcp, Resp) ->
do_send(Sock, ssl, Resp) ->
ssl:send(Sock, Resp).
%%------------------------------------------------------------------------------
%% Utility functions
%%------------------------------------------------------------------------------

Ladataan…
Peruuta
Tallenna