diff --git a/.gitignore b/.gitignore index 12f55c8..1491146 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ doc/edoc-info Emakefile *.bat .dialyzer_plt +.rebar diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 21e8d06..665e64b 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -9,9 +9,12 @@ In alphabetical order: Adam Kocoloski Andrew Tunnell-Jones Anthony Molinaro +Benjamin P Lee (https://github.com/benjaminplee) Benoit Chesneau (https://github.com/benoitc) +Brian Richards (http://github.com/richbria) Chris Newcombe Dan Kelley +Dan Schwabe (https://github.com/dfschwabe) Derek Upham Eric Merritt Erik Reitsma diff --git a/Makefile b/Makefile index b596b64..28dfda8 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,11 @@ install: compile mkdir -p $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/ cp -r ebin $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/ -test: all +eunit_test: all ./rebar eunit - erl -noshell -pa .eunit -pa test -s ibrowse -s ibrowse_test unit_tests \ + +test: all + erl -noshell -pa test -pa ebin -s ibrowse_test unit_tests \ -s ibrowse_test verify_chunked_streaming \ -s ibrowse_test test_chunked_streaming_once \ -s erlang halt diff --git a/include/ibrowse.hrl b/include/ibrowse.hrl index 150b1b7..5da1f0d 100644 --- a/include/ibrowse.hrl +++ b/include/ibrowse.hrl @@ -18,4 +18,9 @@ -record(ibrowse_conf, {key, value}). +-define(CONNECTIONS_LOCAL_TABLE, ibrowse_lb). +-define(LOAD_BALANCER_NAMED_TABLE, ibrowse_lb). +-define(CONF_TABLE, ibrowse_conf). +-define(STREAM_TABLE, ibrowse_stream). + -endif. diff --git a/rebar b/rebar index b9c73ff..8e4deb6 100755 Binary files a/rebar and b/rebar differ diff --git a/src/ibrowse.erl b/src/ibrowse.erl index fbb4b83..51fcb86 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -651,7 +651,7 @@ show_dest_status() -> io:format("~80.80.=s~n", [""]), Metrics = get_metrics(), lists:foreach( - fun({Host, Port, Lb_pid, Tid, Size}) -> + fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) -> io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n", [Host ++ ":" ++ integer_to_list(Port), integer_to_list(Tid), @@ -697,7 +697,7 @@ get_metrics() -> fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) -> case get_metrics(X_host, X_port) of {_, _, _, _, _} = X_res -> - [X_res | X_acc]; + [{X_host, X_port, X_res} | X_acc]; _X_res -> X_acc end @@ -708,7 +708,12 @@ get_metrics(Host, Port) -> [] -> no_active_processes; [#lb_pid{pid = Lb_pid, ets_tid = Tid}] -> - MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)), + MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of + {message_queue_len, Msg_q_len} -> + Msg_q_len; + _ -> + -1 + end, case Tid of undefined -> {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}}; diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index db9559a..92e4964 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -2021,12 +2021,12 @@ dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, proc_state = Proc_state} = State) when Tid /= undefined, Proc_state /= ?dead_proc_walking -> Ts = os:timestamp(), + catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}), (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'}, [{'==', '$1', {const,self()}}, {'<', '$2', {const,Ts}} ], [true]}])), - catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}), State#state{cur_pipeline_size = Pipe_sz - 1}; dec_pipeline_counter(State) -> State. diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index 88b169b..894d8ad 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -124,17 +124,17 @@ handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_opt State_1 = maybe_create_ets(State), Tid = State_1#state.ets_tid, Tid_size = ets:info(Tid, size), - case Tid_size > Max_sess of + case Tid_size >= Max_sess of true -> - Reply = find_best_connection(Tid, Max_pipe, Tid_size), + Reply = find_best_connection(Tid, Max_pipe), {reply, Reply, State_1#state{max_sessions = Max_sess, max_pipeline_size = Max_pipe}}; false -> {ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options), Ts = os:timestamp(), - ets:insert(Tid, {{0, Ts, Pid}, []}), - {reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions = Max_sess, - max_pipeline_size = Max_pipe}} + ets:insert(Tid, {{1, Ts, Pid}, []}), + {reply, {ok, {1, Ts, Pid}}, State_1#state{max_sessions = Max_sess, + max_pipeline_size = Max_pipe}} end; handle_call(Request, _From, State) -> @@ -215,18 +215,13 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -find_best_connection(Tid, Max_pipe, _Num_cur) -> +find_best_connection(Tid, Max_pipe) -> case ets:first(Tid) of - {Spec_size, Ts, Pid} = First -> - case Spec_size >= Max_pipe of - true -> - {error, retry_later}; - false -> - ets:delete(Tid, First), - ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}), - {ok, First} - end; - '$end_of_table' -> + {Spec_size, Ts, Pid} = First when Spec_size < Max_pipe -> + ets:delete(Tid, First), + ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}), + {ok, First}; + _ -> {error, retry_later} end. diff --git a/src/ibrowse_lib.erl b/src/ibrowse_lib.erl index 1098b0f..1f0a61a 100644 --- a/src/ibrowse_lib.erl +++ b/src/ibrowse_lib.erl @@ -363,6 +363,7 @@ parse_url([], get_password, Url, TmpAcc) -> parse_url([], State, Url, TmpAcc) -> {invalid_uri_2, State, Url, TmpAcc}. +default_port(socks5) -> 1080; default_port(http) -> 80; default_port(https) -> 443; default_port(ftp) -> 21. diff --git a/src/ibrowse_socks5.erl b/src/ibrowse_socks5.erl index 41d57f2..417f595 100644 --- a/src/ibrowse_socks5.erl +++ b/src/ibrowse_socks5.erl @@ -1,47 +1,108 @@ --module(ibrowse_socks5). +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. --export([connect/3]). +-module(ibrowse_socks5). --define(TIMEOUT, 2000). +-define(VERSION, 5). +-define(CONNECT, 1). --define(SOCKS5, 5). --define(AUTH_METHOD_NO, 0). --define(AUTH_METHOD_USERPASS, 2). --define(ADDRESS_TYPE_IP4, 1). --define(COMMAND_TYPE_TCPIP_STREAM, 1). --define(RESERVER, 0). --define(STATUS_GRANTED, 0). +-define(NO_AUTH, 0). +-define(USERPASS, 2). +-define(UNACCEPTABLE, 16#FF). +-define(RESERVED, 0). -connect(Host, Port, Options) -> - Socks5Host = proplists:get_value(socks5_host, Options), - Socks5Port = proplists:get_value(socks5_port, Options), +-define(ATYP_IPV4, 1). +-define(ATYP_DOMAINNAME, 3). +-define(ATYP_IPV6, 4). - {ok, Socket} = gen_tcp:connect(Socks5Host, Socks5Port, [binary, {packet, 0}, {keepalive, true}, {active, false}]), +-define(SUCCEEDED, 0). - {ok, _Bin} = - case proplists:get_value(socks5_user, Options, undefined) of - undefined -> - ok = gen_tcp:send(Socket, <>), - {ok, <>} = gen_tcp:recv(Socket, 2, ?TIMEOUT); - _Else -> - Socks5User = list_to_binary(proplists:get_value(socks5_user, Options)), - Socks5Pass = list_to_binary(proplists:get_value(socks5_pass, Options)), +-export([connect/5]). - ok = gen_tcp:send(Socket, <>), - {ok, <>} = gen_tcp:recv(Socket, 2, ?TIMEOUT), +-import(ibrowse_lib, [get_value/2, get_value/3]). - UserLength = byte_size(Socks5User), +connect(Host, Port, Options, SockOptions, Timeout) -> + Socks5Host = get_value(socks5_host, Options), + Socks5Port = get_value(socks5_port, Options), + case gen_tcp:connect(Socks5Host, Socks5Port, SockOptions, Timeout) of + {ok, Socket} -> + case handshake(Socket, Options) of + ok -> + case connect(Host, Port, Socket) of + ok -> + {ok, Socket}; + Else -> + gen_tcp:close(Socket), + Else + end; + Else -> + gen_tcp:close(Socket), + Else + end; + Else -> + Else + end. - ok = gen_tcp:send(Socket, << 1, UserLength >>), - ok = gen_tcp:send(Socket, Socks5User), - PassLength = byte_size(Socks5Pass), - ok = gen_tcp:send(Socket, << PassLength >>), - ok = gen_tcp:send(Socket, Socks5Pass), - {ok, <<1, 0>>} = gen_tcp:recv(Socket, 2, ?TIMEOUT) +handshake(Socket, Options) when is_port(Socket) -> + {Handshake, Success} = case get_value(socks5_user, Options, <<>>) of + <<>> -> + {<>, ?NO_AUTH}; + User -> + Password = get_value(socks5_password, Options, <<>>), + {<>, ?USERPASS} end, + ok = gen_tcp:send(Socket, Handshake), + case gen_tcp:recv(Socket, 0) of + {ok, <>} -> + ok; + {ok, <>} -> + {error, unacceptable}; + {error, Reason} -> + {error, Reason} + end. - {ok, {IP1,IP2,IP3,IP4}} = inet:getaddr(Host, inet), +connect(Host, Port, Via) when is_list(Host) -> + connect(list_to_binary(Host), Port, Via); +connect(Host, Port, Via) when is_binary(Host), is_integer(Port), + is_port(Via) -> + {AddressType, Address} = case inet:parse_address(binary_to_list(Host)) of + {ok, {IP1, IP2, IP3, IP4}} -> + {?ATYP_IPV4, <>}; + {ok, {IP1, IP2, IP3, IP4, IP5, IP6, IP7, IP8}} -> + {?ATYP_IPV6, <>}; + _ -> + HostLength = byte_size(Host), + {?ATYP_DOMAINNAME, <>} + end, + ok = gen_tcp:send(Via, + <>), + case gen_tcp:recv(Via, 0) of + {ok, <>} -> + ok; + {ok, <>} -> + {error, rep(Rep)}; + {error, Reason} -> + {error, Reason} + end. - ok = gen_tcp:send(Socket, <>), - {ok, << ?SOCKS5, ?STATUS_GRANTED, ?RESERVER, ?ADDRESS_TYPE_IP4, IP1, IP2, IP3, IP4, Port:16 >>} = gen_tcp:recv(Socket, 10, ?TIMEOUT), - {ok, Socket}. +rep(0) -> succeeded; +rep(1) -> server_fail; +rep(2) -> disallowed_by_ruleset; +rep(3) -> network_unreachable; +rep(4) -> host_unreachable; +rep(5) -> connection_refused; +rep(6) -> ttl_expired; +rep(7) -> command_not_supported; +rep(8) -> address_type_not_supported. diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl new file mode 100644 index 0000000..3517011 --- /dev/null +++ b/test/ibrowse_functional_tests.erl @@ -0,0 +1,174 @@ +%%% File : ibrowse_functional_tests.erl +%%% Authors : Benjamin Lee +%%% Dan Schwabe +%%% Brian Richards +%%% Description : Functional tests of the ibrowse library using a live test HTTP server +%%% Created : 18 November 2014 by Benjamin Lee + +-module(ibrowse_functional_tests). + +-include_lib("eunit/include/eunit.hrl"). +-define(PER_TEST_TIMEOUT_SEC, 60). +-define(TIMEDTEST(Desc, Fun), {Desc, {timeout, ?PER_TEST_TIMEOUT_SEC, fun Fun/0}}). + +-define(SERVER_PORT, 8181). +-define(BASE_URL, "http://localhost:" ++ integer_to_list(?SERVER_PORT)). +-define(SHORT_TIMEOUT_MS, 5000). +-define(LONG_TIMEOUT_MS, 30000). +-define(PAUSE_FOR_CONNECTIONS_MS, 2000). + +-compile(export_all). + +setup() -> + application:start(crypto), + application:start(public_key), + application:start(ssl), + ibrowse_test_server:start_server(?SERVER_PORT, tcp), + ibrowse:start(), + ok. + +teardown(_) -> + ibrowse:stop(), + ibrowse_test_server:stop_server(?SERVER_PORT), + ok. + +running_server_fixture_test_() -> + {foreach, + fun setup/0, + fun teardown/1, + [ + ?TIMEDTEST("Simple request can be honored", simple_request), + ?TIMEDTEST("Slow server causes timeout", slow_server_timeout), + ?TIMEDTEST("Pipeline depth goes down with responses", pipeline_depth), + ?TIMEDTEST("Pipelines refill", pipeline_refill), + ?TIMEDTEST("Timeout closes pipe", closing_pipes), + ?TIMEDTEST("Requests are balanced over connections", balanced_connections), + ?TIMEDTEST("Pipeline too small signals retries", small_pipeline), + ?TIMEDTEST("Dest status can be gathered", status) + ] + }. + +simple_request() -> + ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [])). + +slow_server_timeout() -> + ?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)). + +pipeline_depth() -> + MaxSessions = 2, + MaxPipeline = 2, + RequestsSent = 2, + EmptyPipelineDepth = 0, + + ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()), + + Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end, + times(RequestsSent, fun() -> spawn_link(Fun) end), + + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), + + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), + ?assertEqual(lists:duplicate(MaxSessions, EmptyPipelineDepth), Counts). + +pipeline_refill() -> + MaxSessions = 2, + MaxPipeline = 2, + RequestsToFill = MaxSessions * MaxPipeline, + + %% Send off enough requests to fill sessions and pipelines in rappid succession + Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end, + times(RequestsToFill, fun() -> spawn_link(Fun) end), + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), + + % Verify that connections properly reported their completed responses and can still accept more + ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)), + + % and do it again to make sure we really are clear + times(RequestsToFill, fun() -> spawn_link(Fun) end), + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), + + % Verify that connections properly reported their completed responses and can still accept more + ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)). + +closing_pipes() -> + MaxSessions = 2, + MaxPipeline = 2, + RequestsSent = 2, + BalancedNumberOfRequestsPerConnection = 1, + + ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()), + + Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end, + times(RequestsSent, fun() -> spawn_link(Fun) end), + + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), + + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), + ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts), + + timer:sleep(?SHORT_TIMEOUT_MS), + + ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()). + +balanced_connections() -> + MaxSessions = 4, + MaxPipeline = 100, + RequestsSent = 80, + BalancedNumberOfRequestsPerConnection = 20, + + ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()), + + Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?LONG_TIMEOUT_MS) end, + times(RequestsSent, fun() -> spawn_link(Fun) end), + + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), + + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), + + ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts). + +small_pipeline() -> + MaxSessions = 10, + MaxPipeline = 10, + RequestsSent = 100, + FullRequestsPerConnection = 10, + + ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()), + + Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end, + times(RequestsSent, fun() -> spawn(Fun) end), + + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line + + ibrowse:show_dest_status("localhost", 8181), + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), + + ?assertEqual(lists:duplicate(MaxSessions, FullRequestsPerConnection), Counts), + + Response = ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS), + + ?assertEqual({error, retry_later}, Response). + +status() -> + MaxSessions = 10, + MaxPipeline = 10, + RequestsSent = 100, + + Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end, + times(RequestsSent, fun() -> spawn(Fun) end), + + timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line + + ibrowse:show_dest_status(), + ibrowse:show_dest_status("http://localhost:8181"). + + +times(0, _) -> + ok; +times(X, Fun) -> + Fun(), + times(X - 1, Fun). diff --git a/test/ibrowse_test.erl b/test/ibrowse_test.erl index e216e82..0787493 100644 --- a/test/ibrowse_test.erl +++ b/test/ibrowse_test.erl @@ -40,7 +40,7 @@ test_retry_of_requests/1 ]). --include("ibrowse.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). test_stream_once(Url, Method, Options) -> test_stream_once(Url, Method, Options, 5000). @@ -257,17 +257,17 @@ dump_errors(Key, Iod) -> ] ++ ?LOCAL_TESTS). local_unit_tests() -> - error_logger:tty(false), - unit_tests([], ?LOCAL_TESTS), - error_logger:tty(true). + unit_tests([], ?LOCAL_TESTS). unit_tests() -> - unit_tests([], ?TEST_LIST). + error_logger:tty(false), + unit_tests([], ?TEST_LIST), + error_logger:tty(true). unit_tests(Options, Test_list) -> application:start(crypto), application:start(public_key), - application:start(ssl), + application:ensure_all_started(ssl), (catch ibrowse_test_server:start_server(8181, tcp)), application:start(ibrowse), Options_1 = Options ++ [{connect_timeout, 5000}], @@ -387,6 +387,8 @@ wait_for_resp(Pid) -> {'EXIT', Reason}; {'DOWN', _, _, _, _} -> wait_for_resp(Pid); + {'EXIT', _, normal} -> + wait_for_resp(Pid); Msg -> io:format("Recvd unknown message: ~p~n", [Msg]), wait_for_resp(Pid) @@ -564,6 +566,7 @@ test_retry_of_requests() -> test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay"). test_retry_of_requests(Url) -> + reset_ibrowse(), Timeout_1 = 2050, Res_1 = test_retry_of_requests(Url, Timeout_1), case lists:filter(fun({_Pid, {ok, "200", _, _}}) -> @@ -586,7 +589,6 @@ test_retry_of_requests(Url) -> _ -> exit({failed, Timeout_1, Res_1}) end, - reset_ibrowse(), Timeout_2 = 2200, Res_2 = test_retry_of_requests(Url, Timeout_2), case lists:filter(fun({_Pid, {ok, "200", _, _}}) -> @@ -604,10 +606,10 @@ test_retry_of_requests(Url) -> true -> ok; false -> - exit({failed, Timeout_2, Res_2}) + exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2}) end; _ -> - exit({failed, Timeout_2, Res_2}) + exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2}) end, success. diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index 1d72210..7025286 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -1,49 +1,60 @@ %%% File : ibrowse_test_server.erl %%% Author : Chandrashekhar Mullaparthi +%%% Benjamin Lee +%%% Dan Schwabe +%%% Brian Richards %%% Description : A server to simulate various test scenarios %%% Created : 17 Oct 2010 by Chandrashekhar Mullaparthi -module(ibrowse_test_server). -export([ start_server/2, - stop_server/1 + stop_server/1, + get_conn_pipeline_depth/0 ]). -record(request, {method, uri, version, headers = [], body = []}). -define(dec2hex(X), erlang:integer_to_list(X, 16)). +-define(ACCEPT_TIMEOUT_MS, 1000). +-define(CONN_PIPELINE_DEPTH, conn_pipeline_depth). start_server(Port, Sock_type) -> Fun = fun() -> - Proc_name = server_proc_name(Port), - case whereis(Proc_name) of - undefined -> - register(Proc_name, self()), - case do_listen(Sock_type, Port, [{active, false}, - {reuseaddr, true}, - {nodelay, true}, - {packet, http}]) of - {ok, Sock} -> - do_trace("Server listening on port: ~p~n", [Port]), - accept_loop(Sock, Sock_type); - Err -> - erlang:error( - lists:flatten( - io_lib:format( - "Failed to start server on port ~p. ~p~n", - [Port, Err]))), - exit({listen_error, Err}) - end; - _X -> - ok - end - end, + Proc_name = server_proc_name(Port), + case whereis(Proc_name) of + undefined -> + register(Proc_name, self()), + ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]), + case do_listen(Sock_type, Port, [{active, false}, + {reuseaddr, true}, + {nodelay, true}, + {packet, http}]) of + {ok, Sock} -> + do_trace("Server listening on port: ~p~n", [Port]), + accept_loop(Sock, Sock_type); + Err -> + erlang:error( + lists:flatten( + io_lib:format( + "Failed to start server on port ~p. ~p~n", + [Port, Err]))), + exit({listen_error, Err}) + end; + _X -> + ok + end + end, spawn_link(Fun). stop_server(Port) -> server_proc_name(Port) ! stop, + timer:sleep(2000), % wait for server to receive msg and unregister ok. +get_conn_pipeline_depth() -> + ets:tab2list(?CONN_PIPELINE_DEPTH). + server_proc_name(Port) -> list_to_atom("ibrowse_test_server_"++integer_to_list(Port)). @@ -55,24 +66,36 @@ do_listen(ssl, Port, Opts) -> ssl:listen(Port, Opts). do_accept(tcp, Listen_sock) -> - gen_tcp:accept(Listen_sock); + gen_tcp:accept(Listen_sock, ?ACCEPT_TIMEOUT_MS); do_accept(ssl, Listen_sock) -> - ssl:ssl_accept(Listen_sock). + ssl:ssl_accept(Listen_sock, ?ACCEPT_TIMEOUT_MS). accept_loop(Sock, Sock_type) -> case do_accept(Sock_type, Sock) of {ok, Conn} -> - Pid = spawn_link( - fun() -> - server_loop(Conn, Sock_type, #request{}) - end), + Pid = spawn_link(fun() -> connection(Conn, Sock_type) end), set_controlling_process(Conn, Sock_type, Pid), Pid ! {setopts, [{active, true}]}, accept_loop(Sock, Sock_type); + {error, timeout} -> + receive + stop -> + ok + after 10 -> + accept_loop(Sock, Sock_type) + end; Err -> Err end. +connection(Conn, Sock_type) -> + catch ets:insert(?CONN_PIPELINE_DEPTH, {self(), 0}), + try + server_loop(Conn, Sock_type, #request{}) + after + catch ets:delete(?CONN_PIPELINE_DEPTH, self()) + end. + set_controlling_process(Sock, tcp, Pid) -> gen_tcp:controlling_process(Sock, Pid); set_controlling_process(Sock, ssl, Pid) -> @@ -86,6 +109,7 @@ setopts(Sock, ssl, Opts) -> server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> receive {http, Sock, {http_request, HttpMethod, HttpUri, HttpVersion}} -> + catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1), server_loop(Sock, Sock_type, Req#request{method = HttpMethod, uri = HttpUri, version = HttpVersion}); @@ -95,9 +119,12 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> case process_request(Sock, Sock_type, Req) of close_connection -> gen_tcp:shutdown(Sock, read_write); + not_done -> + ok; _ -> - server_loop(Sock, Sock_type, #request{}) - end; + catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1) + end, + server_loop(Sock, Sock_type, #request{}); {http, Sock, {http_error, Err}} -> io:format("Error parsing HTTP request:~n" "Req so far : ~p~n" @@ -109,8 +136,6 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> {tcp_closed, Sock} -> do_trace("Client closed connection~n", []), ok; - stop -> - ok; Other -> io:format("Recvd unknown msg: ~p~n", [Other]), exit({unknown_msg, Other}) @@ -163,7 +188,6 @@ process_request(Sock, Sock_type, uri = {abs_path, "/ibrowse_head_transfer_enc"}}) -> Resp = <<"HTTP/1.1 400 Bad Request\r\nServer: Apache-Coyote/1.1\r\nContent-Length:5\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\n\r\nabcde">>, do_send(Sock, Sock_type, Resp); - process_request(Sock, Sock_type, #request{method='GET', headers = Headers, @@ -215,6 +239,8 @@ process_request(Sock, Sock_type, Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>, do_send(Sock, Sock_type, Resp), close_connection; +process_request(_Sock, _Sock_type, #request{uri = {abs_path, "/never_respond"} } ) -> + not_done; process_request(Sock, Sock_type, Req) -> do_trace("Recvd req: ~p~n", [Req]), Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>, @@ -226,7 +252,6 @@ do_send(Sock, tcp, Resp) -> do_send(Sock, ssl, Resp) -> ssl:send(Sock, Resp). - %%------------------------------------------------------------------------------ %% Utility functions %%------------------------------------------------------------------------------