diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl index c2bae86..0a68e14 100644 --- a/test/ibrowse_functional_tests.erl +++ b/test/ibrowse_functional_tests.erl @@ -32,7 +32,8 @@ running_server_fixture_test_() -> fun teardown/1, [ ?TIMEDTEST("Simple request can be honored", simple_request), - ?TIMEDTEST("Slow server causes timeout", slow_server_timeout) + ?TIMEDTEST("Slow server causes timeout", slow_server_timeout), + ?TIMEDTEST("Requests are balanced over connections", balanced_connections) ] }. @@ -41,3 +42,32 @@ simple_request() -> slow_server_timeout() -> ?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)). + +balanced_connections() -> + MaxSessions = 4, + MaxPipeline = 100, + RequestsSent = 80, + BalancedNumberOfRequestsPerConnection = 20, + + ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()), + + Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], 30000) end, + times(RequestsSent, fun() -> spawn_link(Fun) end), + + timer:sleep(1000), + + Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Diffs)), + + lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs). + +close_to_zero(0) -> yep; +close_to_zero(-1) -> yep; +close_to_zero(1) -> yep; +close_to_zero(X) -> {nope, X}. + +times(0, _) -> + ok; +times(X, Fun) -> + Fun(), + times(X - 1, Fun). diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index c102431..940552a 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -8,18 +8,21 @@ -module(ibrowse_test_server). -export([ start_server/2, - stop_server/1 + stop_server/1, + get_conn_pipeline_depth/0 ]). -record(request, {method, uri, version, headers = [], body = []}). -define(dec2hex(X), erlang:integer_to_list(X, 16)). -define(ACCEPT_TIMEOUT_MS, 1000). +-define(CONN_PIPELINE_DEPTH, conn_pipeline_depth). start_server(Port, Sock_type) -> Fun = fun() -> Name = server_proc_name(Port), register(Name, self()), + ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]), case do_listen(Sock_type, Port, [{active, false}, {reuseaddr, true}, {nodelay, true}, @@ -44,6 +47,9 @@ stop_server(Port) -> timer:sleep(2000), % wait for server to receive msg and unregister ok. +get_conn_pipeline_depth() -> + ets:tab2list(?CONN_PIPELINE_DEPTH). + server_proc_name(Port) -> list_to_atom("ibrowse_test_server_"++integer_to_list(Port)). @@ -66,6 +72,7 @@ accept_loop(Sock, Sock_type) -> fun() -> server_loop(Conn, Sock_type, #request{}) end), + ets:insert(?CONN_PIPELINE_DEPTH, {Pid, 0}), set_controlling_process(Conn, Sock_type, Pid), Pid ! {setopts, [{active, true}]}, accept_loop(Sock, Sock_type); @@ -99,6 +106,7 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> {http, Sock, {http_header, _, _, _, _} = H} -> server_loop(Sock, Sock_type, Req#request{headers = [H | Headers]}); {http, Sock, http_eoh} -> + ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1), process_request(Sock, Sock_type, Req), server_loop(Sock, Sock_type, #request{}); {http, Sock, {http_error, Err}} ->