瀏覽代碼

Merge branch 'merge_pull_req_123'

pull/140/head
Chandrashekhar Mullaparthi 9 年之前
父節點
當前提交
ae6a91855b
共有 14 個文件被更改,包括 876 次插入302 次删除
  1. +1
    -0
      .gitignore
  2. +3
    -0
      CONTRIBUTORS
  3. +5
    -2
      Makefile
  4. +1
    -1
      README.md
  5. +6
    -1
      include/ibrowse.hrl
  6. 二進制
      rebar
  7. +90
    -65
      src/ibrowse.erl
  8. +158
    -93
      src/ibrowse_http_client.erl
  9. +39
    -69
      src/ibrowse_lb.erl
  10. +7
    -3
      src/ibrowse_lib.erl
  11. +174
    -0
      test/ibrowse_functional_tests.erl
  12. +181
    -0
      test/ibrowse_load_test.erl
  13. +126
    -29
      test/ibrowse_test.erl
  14. +85
    -39
      test/ibrowse_test_server.erl

+ 1
- 0
.gitignore 查看文件

@ -9,3 +9,4 @@ doc/edoc-info
Emakefile
*.bat
.dialyzer_plt
.rebar

+ 3
- 0
CONTRIBUTORS 查看文件

@ -9,9 +9,12 @@ In alphabetical order:
Adam Kocoloski
Andrew Tunnell-Jones
Anthony Molinaro
Benjamin P Lee (https://github.com/benjaminplee)
Benoit Chesneau (https://github.com/benoitc)
Brian Richards (http://github.com/richbria)
Chris Newcombe
Dan Kelley
Dan Schwabe (https://github.com/dfschwabe)
Derek Upham
Eric Merritt
Erik Reitsma

+ 5
- 2
Makefile 查看文件

@ -15,9 +15,12 @@ install: compile
mkdir -p $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
cp -r ebin $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
test: all
eunit_test: all
./rebar eunit
erl -noshell -pa .eunit -pa test -s ibrowse -s ibrowse_test unit_tests \
test: all
cd test; erl -pa ../../ibrowse/ebin -make; cd ../; \
erl -noshell -pa test -pa ebin -s ibrowse_test unit_tests \
-s ibrowse_test verify_chunked_streaming \
-s ibrowse_test test_chunked_streaming_once \
-s erlang halt

+ 1
- 1
README.md 查看文件

@ -1,4 +1,4 @@
# ibrowse [![Build Status](https://secure.travis-ci.org/johannesh/ibrowse.png)](http://travis-ci.org/johannesh/ibrowse)
# ibrowse [![Build Status](https://secure.travis-ci.org/cmullaparthi/ibrowse.png)](http://travis-ci.org/cmullaparthi/ibrowse)
ibrowse is a HTTP client written in erlang.

+ 6
- 1
include/ibrowse.hrl 查看文件

@ -12,10 +12,15 @@
host_type % 'hostname', 'ipv4_address' or 'ipv6_address'
}).
-record(lb_pid, {host_port, pid}).
-record(lb_pid, {host_port, pid, ets_tid}).
-record(client_conn, {key, cur_pipeline_size = 0, reqs_served = 0}).
-record(ibrowse_conf, {key, value}).
-define(CONNECTIONS_LOCAL_TABLE, ibrowse_lb).
-define(LOAD_BALANCER_NAMED_TABLE, ibrowse_lb).
-define(CONF_TABLE, ibrowse_conf).
-define(STREAM_TABLE, ibrowse_stream).
-endif.

二進制
rebar 查看文件


+ 90
- 65
src/ibrowse.erl 查看文件

@ -160,7 +160,7 @@ stop() ->
%% respHeader() = {headerName(), headerValue()}
%% headerName() = string()
%% headerValue() = string()
%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
%% req_id() = term()
%% ResponseBody = string() | {file, Filename}
%% Reason = term()
@ -254,6 +254,11 @@ send_req(Url, Headers, Method, Body) ->
%% headers. Not quite sure why someone would want this, but one of my
%% users asked for it, so here it is. </li>
%%
%% <li> The <code>preserve_status_line</code> option is to get the raw status line as a custom header
%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary}
%% If both the <code>give_raw_headers</code> and <code>preserve_status_line</code> are specified
%% in a request, only the <code>give_raw_headers</code> is honoured. </li>
%%
%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
%% to receive the raw data stream when the Transfer-Encoding of the server
%% response is Chunked.
@ -340,7 +345,7 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Max_attempts, 0);
Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), Max_attempts, 0);
Err ->
{error, {url_parsing_failed, Err}}
end.
@ -349,29 +354,41 @@ try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Max_attempts, Try_count) when Try_count < Max_attempts ->
Headers, Method, Body, Options_1, Timeout,
Ori_timeout, Req_start_time, Max_attempts, Try_count) when Try_count < Max_attempts ->
ProcessOptions = get_value(worker_process_options, Options_1, []),
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
ProcessOptions) of
{ok, Conn_Pid} ->
{ok, {_Pid_cur_spec_size, _, Conn_Pid}} ->
case do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options_1, Timeout) of
{error, sel_conn_closed} ->
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Max_attempts, Try_count + 1);
Time_now = os:timestamp(),
Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)),
Time_remaining = Ori_timeout - Time_taken_so_far,
Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)),
%% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]),
case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of
true ->
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1,
Time_remaining, Ori_timeout, Req_start_time, Max_attempts, Try_count + 1);
false ->
{error, retry_later}
end;
Res ->
Res
end;
Err ->
Err
end;
try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _) ->
try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _, _) ->
{error, retry_later}.
merge_options(Host, Port, Options) ->
@ -458,14 +475,29 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Headers, Method, ensure_bin(Body),
Options, Timeout) of
{'EXIT', {timeout, _}} ->
P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of
[_|_] = Conn_Pid_info_list ->
Conn_Pid_info_list;
_ ->
process_info_not_available
end,
(catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n",
[[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info])),
{error, req_timedout};
{'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
{error, sel_conn_closed};
{'EXIT', {normal, _}} ->
{error, sel_conn_closed};
{'EXIT', {connection_closed, _}} ->
{'EXIT', {normal, _}} = Ex_rsn ->
(catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n",
[[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn])),
{error, req_timedout};
{error, X} when X == connection_closed;
X == {send_failed, {error, enotconn}};
X == {send_failed,{error,einval}};
X == {send_failed,{error,closed}};
X == connection_closing;
((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) ->
{error, sel_conn_closed};
{error, connection_closed} ->
{error, connection_closed_no_retry} ->
{error, connection_closed};
{error, {'EXIT', {noproc, _}}} ->
{error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
@ -636,7 +668,7 @@ show_dest_status() ->
io:format("~80.80.=s~n", [""]),
Metrics = get_metrics(),
lists:foreach(
fun({Host, Port, Lb_pid, Tid, Size}) ->
fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
[Host ++ ":" ++ integer_to_list(Port),
integer_to_list(Tid),
@ -654,74 +686,68 @@ show_dest_status(Url) ->
%% included.
show_dest_status(Host, Port) ->
case get_metrics(Host, Port) of
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, First_speculative_sz},
{Last_p_sz, Last_speculative_sz}}} ->
{Lb_pid, MsgQueueSize,
Tid, Size,
{{First_p_sz, First_p_sz},
{Last_p_sz, Last_p_sz}}} ->
io:format("Load Balancer Pid : ~p~n"
"LB process msg q size : ~p~n"
"LB ETS table id : ~p~n"
"Num Connections : ~p~n"
"Smallest pipeline : ~p:~p~n"
"Largest pipeline : ~p:~p~n",
"Smallest pipeline : ~p~n"
"Largest pipeline : ~p~n",
[Lb_pid, MsgQueueSize, Tid, Size,
First_p_sz, First_speculative_sz,
Last_p_sz, Last_speculative_sz]);
First_p_sz, Last_p_sz]);
_Err ->
io:format("Metrics not available~n", [])
end.
get_metrics() ->
Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
is_integer(Port) ->
true;
(_) ->
false
end, ets:tab2list(ibrowse_lb)),
All_ets = ets:all(),
lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) ->
case lists:dropwhile(
fun(Tid) ->
ets:info(Tid, owner) /= Lb_pid
end, All_ets) of
[] ->
{Host, Port, Lb_pid, unknown, 0};
[Tid | _] ->
Size = case catch (ets:info(Tid, size)) of
N when is_integer(N) -> N;
_ -> 0
end,
{Host, Port, Lb_pid, Tid, Size}
end
end, Dests).
Dests = lists:filter(
fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
is_integer(Port) ->
true;
(_) ->
false
end, ets:tab2list(ibrowse_lb)),
lists:foldl(
fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
case get_metrics(X_host, X_port) of
{_, _, _, _, _} = X_res ->
[{X_host, X_port, X_res} | X_acc];
_X_res ->
X_acc
end
end, [], Dests).
get_metrics(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid}] ->
MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
%% {Lb_pid, MsgQueueSize,
case lists:dropwhile(
fun(Tid) ->
ets:info(Tid, owner) /= Lb_pid
end, ets:all()) of
[] ->
{Lb_pid, MsgQueueSize, unknown, 0, unknown};
[Tid | _] ->
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
{message_queue_len, Msg_q_len} ->
Msg_q_len;
_ ->
-1
end,
case Tid of
undefined ->
{Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
_ ->
try
Size = ets:info(Tid, size),
case Size of
0 ->
ok;
{Lb_pid, MsgQueueSize, Tid, 0, {{0, 0}, {0, 0}}};
_ ->
First = ets:first(Tid),
Last = ets:last(Tid),
[{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First),
[{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last),
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}}
{First_p_sz, _, _} = ets:first(Tid),
{Last_p_sz, _, _} = ets:last(Tid),
{Lb_pid, MsgQueueSize,
Tid, Size,
{{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}}
end
catch _:_ ->
catch _:_Err ->
not_available
end
end
@ -961,7 +987,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
do_get_connection(#url{host = Host, port = Port}, []) ->
{ok, Pid} = ibrowse_lb:start_link([Host, Port]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = Pid}),
Pid;
do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
Pid.

+ 158
- 93
src/ibrowse_http_client.erl 查看文件

@ -53,7 +53,8 @@
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>,
recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
lb_ets_tid, cur_pipeline_size = 0, prev_req_id,
proc_state
}).
-record(request, {url, method, options, from,
@ -73,6 +74,12 @@
-define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
%% Macros to prevent spelling mistakes causing bugs
-define(dont_retry_pipelined_requests, dont_retry_pipelined_requests).
-define(can_retry_pipelined_requests, can_retry_pipelined_requests).
-define(dead_proc_walking, dead_proc_walking).
%%====================================================================
%% External functions
%%====================================================================
@ -102,9 +109,15 @@ stop(Conn_pid) ->
end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call(
Conn_Pid,
{send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout).
case catch gen_server:call(Conn_Pid,
{send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout) of
{'EXIT', {timeout, _}} ->
{error, req_timedout};
{'EXIT', {noproc, _}} ->
{error, connection_closed};
Res ->
Res
end.
%%====================================================================
%% Server functions
@ -119,6 +132,7 @@ send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
%% {stop, Reason}
%%--------------------------------------------------------------------
init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
process_flag(trap_exit, true),
State = #state{host = Host,
port = Port,
ssl_options = SSLOptions,
@ -128,6 +142,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, set_inac_timer(State)};
init(Url) when is_list(Url) ->
process_flag(trap_exit, true),
case catch ibrowse_lib:parse_url(Url) of
#url{protocol = Protocol} = Url_rec ->
init({undefined, Url_rec, {[], Protocol == https}});
@ -135,6 +150,7 @@ init(Url) when is_list(Url) ->
{error, invalid_url}
end;
init({Host, Port}) ->
process_flag(trap_exit, true),
State = #state{host = Host,
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
@ -156,6 +172,10 @@ init({Host, Port}) ->
handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
{reply, {error, connection_closing}, State};
handle_call({send_req, _}, _From, #state{proc_state = ?dead_proc_walking} = State) ->
shutting_down(State),
{reply, {error, connection_closing}, State};
handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From, State) ->
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
@ -207,30 +227,40 @@ handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
handle_info({stream_close, _Req_id}, State) ->
shutting_down(State),
do_close(State),
do_error_reply(State, closing_on_request),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_close(State_1),
do_error_reply(State_1, closing_on_request),
delayed_stop_timer(),
{noreply, State_1};
handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?can_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({ssl_closed, _Sock}, State) ->
do_trace("SSL connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?can_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({tcp_error, _Sock, Reason}, State) ->
do_trace("Error on connection to ~1000.p:~1000.p -> ~1000.p~n",
[State#state.host, State#state.port, Reason]),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({ssl_error, _Sock, Reason}, State) ->
do_trace("Error on SSL connection to ~1000.p:~1000.p -> ~1000.p~n",
[State#state.host, State#state.port, Reason]),
handle_sock_closed(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
delayed_stop_timer(),
{noreply, State_1};
handle_info({req_timedout, From}, State) ->
case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of
@ -238,21 +268,28 @@ handle_info({req_timedout, From}, State) ->
{noreply, State};
{value, #request{stream_to = StreamTo, req_id = ReqId}} ->
catch StreamTo ! {ibrowse_async_response_timeout, ReqId},
shutting_down(State),
do_error_reply(State, req_timedout),
{stop, normal, State}
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_error_reply(State_1, req_timedout),
delayed_stop_timer(),
{noreply, State_1}
end;
handle_info(timeout, State) ->
do_trace("Inactivity timeout triggered. Shutting down connection~n", []),
shutting_down(State),
do_error_reply(State, req_timedout),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_error_reply(State_1, req_timedout),
delayed_stop_timer(),
{noreply, State_1};
handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
handle_info(delayed_stop, State) ->
{stop, normal, State};
handle_info(Info, State) ->
io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
[State#state.host, State#state.port, Info]),
@ -264,8 +301,10 @@ handle_info(Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
terminate(_Reason, #state{lb_ets_tid = Tid} = State) ->
do_close(State),
shutting_down(State),
(catch ets:select_delete(Tid, [{{{'_','_','$1'},'_'},[{'==','$1',{const,self()}}],[true]}])),
ok.
%%--------------------------------------------------------------------
@ -285,16 +324,20 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
handle_sock_data(Data, #state{status=idle}=State) ->
do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
shutting_down(State),
do_error_reply(State, data_in_status_idle),
do_close(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
do_error_reply(State_1, data_in_status_idle),
do_close(State_1),
delayed_stop_timer(),
{noreply, State_1};
handle_sock_data(Data, #state{status = get_header}=State) ->
case parse_response(Data, State) of
{error, _Reason} ->
shutting_down(State),
{stop, normal, State};
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
delayed_stop_timer(),
{noreply, State_1};
#state{socket = Socket, status = Status, cur_req = CurReq} = State_1 ->
_ = case {Status, CurReq} of
{get_header, #request{caller_controls_socket = true}} ->
@ -315,10 +358,12 @@ handle_sock_data(Data, #state{status = get_body,
true ->
case accumulate_response(Data, State) of
{error, Reason} ->
shutting_down(State),
fail_pipelined_requests(State,
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
fail_pipelined_requests(State_1,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
delayed_stop_timer(),
{noreply, State_1};
State_1 ->
_ = active_once(State_1),
State_2 = set_inac_timer(State_1),
@ -327,10 +372,12 @@ handle_sock_data(Data, #state{status = get_body,
_ ->
case parse_11_response(Data, State) of
{error, Reason} ->
shutting_down(State),
fail_pipelined_requests(State,
State_1 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_1),
fail_pipelined_requests(State_1,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
delayed_stop_timer(),
{noreply, State_1};
#state{cur_req = #request{caller_controls_socket = Ccs},
interim_reply_sent = Irs} = State_1 ->
_ = case Irs of
@ -451,11 +498,11 @@ file_mode(_Srtf) -> write.
%%--------------------------------------------------------------------
%% Handles the case when the server closes the socket
%%--------------------------------------------------------------------
handle_sock_closed(#state{status=get_header} = State) ->
handle_sock_closed(#state{status=get_header} = State, _) ->
shutting_down(State),
do_error_reply(State, connection_closed);
do_error_reply(State, connection_closed_no_retry);
handle_sock_closed(#state{cur_req=undefined} = State) ->
handle_sock_closed(#state{cur_req=undefined} = State, _) ->
shutting_down(State);
%% We check for IsClosing because this the server could have sent a
@ -469,7 +516,7 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC
recvd_headers = Headers,
status_line = Status_line,
raw_headers = Raw_headers
}=State) ->
}=State, Retry_state) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
options = Options,
@ -497,30 +544,35 @@ handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC
{ok, SC, Headers, Buf, Raw_req}
end,
State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed),
case Retry_state of
?dont_retry_pipelined_requests ->
ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed_no_retry);
?can_retry_pipelined_requests ->
ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed)
end,
State_1;
_ ->
ok = do_error_reply(State, connection_closed),
case Retry_state of
?dont_retry_pipelined_requests ->
ok = do_error_reply(State, connection_closed_no_retry);
?can_retry_pipelined_requests ->
ok = do_error_reply(State, connection_closed)
end,
State
end.
do_connect(Host, Port, Options, State, Timeout) ->
SockOptions = get_sock_options(Host, Options, State#state.ssl_options),
case {get_value(socks5_host, Options, undefined), State#state.is_ssl} of
{undefined, true} ->
ssl:connect(Host, Port, SockOptions, Timeout);
{undefined, false} ->
gen_tcp:connect(Host, Port, SockOptions, Timeout);
{_, _} ->
case {ibrowse_socks5:connect(Host, Port, Options, SockOptions, Timeout),
State#state.is_ssl} of
{{ok, Socket}, true} ->
ssl:connect(Socket, SockOptions, Timeout);
{{ok, Socket}, false} ->
{ok, Socket};
{Else, _} ->
Else
end
do_connect(Host, Port, Options, #state{is_ssl = true,
use_proxy = false,
ssl_options = SSLOptions},
Timeout) ->
ssl:connect(Host, Port, get_sock_options(Host, Options, SSLOptions), Timeout);
do_connect(Host, Port, Options, _State, Timeout) ->
Socks5Host = get_value(socks5_host, Options, undefined),
case Socks5Host of
undefined ->
gen_tcp:connect(Host, Port, get_sock_options(Host, Options, []), Timeout);
_ ->
catch ibrowse_socks5:connect(Host, Port, Options)
end.
get_sock_options(Host, Options, SSLOptions) ->
@ -710,10 +762,12 @@ send_req_1(From,
connect_timeout = Conn_timeout},
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
shutting_down(State_2),
State_3 = State_2#state{proc_state = ?dead_proc_walking},
shutting_down(State_3),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2}
delayed_stop_timer(),
{noreply, State_3}
end;
%% Send a CONNECT request.
@ -765,16 +819,20 @@ send_req_1(From,
State_3 = set_inac_timer(State_2),
{noreply, State_3};
Err ->
shutting_down(State_1),
State_2 = State_1#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
delayed_stop_timer(),
{noreply, State_2}
end;
Err ->
shutting_down(State_1),
State_2 = State_1#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
delayed_stop_timer(),
{noreply, State_2}
end;
send_req_1(From, Url, Headers, Method, Body, Options, Timeout,
@ -873,16 +931,20 @@ send_req_1(From,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
Err ->
shutting_down(State),
State_2 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State}
delayed_stop_timer(),
{noreply, State_2}
end;
Err ->
shutting_down(State),
State_2 = State#state{proc_state = ?dead_proc_walking},
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State}
delayed_stop_timer(),
{noreply, State_2}
end.
maybe_modify_headers(#url{}, connect, _, Headers, State) ->
@ -1441,7 +1503,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, RespHeaders, Raw_headers, Options),
Give_raw_req = get_value(return_raw_request, Options, false),
Reply = case get_value(give_raw_headers, Options, false) of
true when Give_raw_req == false ->
@ -1468,7 +1530,7 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
reply_buffer = RepBuf
} = State) ->
Body = RepBuf,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Resp_headers, Raw_headers, Options),
Give_raw_req = get_value(return_raw_request, Options, false),
Reply = case get_value(give_raw_headers, Options, false) of
true when Give_raw_req == false ->
@ -1773,7 +1835,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers,
recvd_headers = Headers, http_status_code = StatCode,
cur_req = #request{options = Opts}
}) ->
{Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
{Headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
@ -1781,7 +1843,7 @@ send_async_headers(ReqId, StreamTo, Give_raw_headers,
catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts) ->
Custom_headers = get_value(add_custom_headers, Opts, []),
Headers_1 = Headers ++ Custom_headers,
Raw_headers_1 = case Custom_headers of
@ -1791,7 +1853,12 @@ maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
_ ->
Raw_headers
end,
{Headers_1, Raw_headers_1}.
case get_value(preserve_status_line, Opts, false) of
true ->
{[{ibrowse_status_line, Status_line} | Headers_1], Raw_headers_1};
false ->
{Headers_1, Raw_headers_1}
end.
format_response_data(Resp_format, Body) ->
case Resp_format of
@ -1945,34 +2012,29 @@ shutting_down(#state{lb_ets_tid = undefined}) ->
ok;
shutting_down(#state{lb_ets_tid = Tid,
cur_pipeline_size = _Sz}) ->
catch ets:delete(Tid, self()).
(catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
update_counter(Tid, self(), {2,1,99999,9999}),
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
State#state{cur_pipeline_size = Pipe_sz + 1}.
update_counter(Tid, Key, Args) ->
ets:update_counter(Tid, Key, Args).
dec_pipeline_counter(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
_ = try
update_counter(Tid, self(), {2,-1,0,0}),
update_counter(Tid, self(), {3,-1,0,0})
catch
_:_ ->
ok
end,
State#state{cur_pipeline_size = Pipe_sz - 1}.
lb_ets_tid = Tid,
proc_state = Proc_state} = State) when Tid /= undefined,
Proc_state /= ?dead_proc_walking ->
Ts = os:timestamp(),
catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
(catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
[{'==', '$1', {const,self()}},
{'<', '$2', {const,Ts}}
],
[true]}])),
State#state{cur_pipeline_size = Pipe_sz - 1};
dec_pipeline_counter(State) ->
State.
flatten([H | _] = L) when is_integer(H) ->
L;
@ -2057,3 +2119,6 @@ get_header_value(Name, Headers, Default_val) ->
{value, {_, Val}} ->
Val
end.
delayed_stop_timer() ->
erlang:send_after(500, self(), delayed_stop).

+ 39
- 69
src/ibrowse_lb.erl 查看文件

@ -36,7 +36,6 @@
port,
max_sessions,
max_pipeline_size,
num_cur_sessions = 0,
proc_state
}).
@ -70,13 +69,13 @@ init([Host, Port]) ->
Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]),
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
{ok, #state{parent_pid = whereis(ibrowse),
State = #state{parent_pid = whereis(ibrowse),
host = Host,
port = Port,
ets_tid = Tid,
max_pipeline_size = Max_pipe_sz,
max_sessions = Max_sessions}}.
max_sessions = Max_sessions},
State_1 = maybe_create_ets(State),
{ok, State_1}.
spawn_connection(Lb_pid, Url,
Max_sessions,
@ -113,34 +112,30 @@ handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
{stop, normal, State};
handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
ets:foldl(fun({Pid, _, _}, Acc) ->
ibrowse_http_client:stop(Pid),
Acc
end, [], Tid),
stop_all_conn_procs(Tid),
gen_server:reply(_From, ok),
{stop, normal, State};
handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
{reply, {error, shutting_down}, State};
%% Update max_sessions in #state with supplied value
handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _, _}, _From,
#state{num_cur_sessions = Num} = State)
when Num >= Max_sess ->
State_1 = maybe_create_ets(State),
Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
{reply, Reply, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From,
#state{num_cur_sessions = Cur} = State) ->
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
{ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options),
ets:insert(Tid, {Pid, 0, 0}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
State) ->
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
Tid_size = ets:info(Tid, size),
case Tid_size >= Max_sess of
true ->
Reply = find_best_connection(Tid, Max_pipe),
{reply, Reply, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}};
false ->
{ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
Ts = os:timestamp(),
ets:insert(Tid, {{1, Ts, Pid}, []}),
{reply, {ok, {1, Ts, Pid}}, State_1#state{max_sessions = Max_sess,
max_pipeline_size = Max_pipe}}
end;
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
@ -163,24 +158,6 @@ handle_cast(_Msg, State) ->
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) ->
{stop, normal, State};
handle_info({'EXIT', _Pid, _Reason}, #state{ets_tid = undefined} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, _Reason},
#state{num_cur_sessions = Cur,
ets_tid = Tid} = State) ->
ets:match_delete(Tid, {{'_', Pid}, '_'}),
Cur_1 = Cur - 1,
case Cur_1 of
0 ->
ets:delete(Tid),
{noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
_ ->
{noreply, State#state{num_cur_sessions = Cur_1}}
end;
handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
put(my_trace_flag, Bool),
@ -216,12 +193,17 @@ handle_info(_Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, #state{host = Host, port = Port}) ->
% Use delete_object instead of delete in case another process for this host/port
% has been spawned, in which case will be deleting the wrong record because pid won't match.
ets:delete_object(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self()}),
terminate(_Reason, #state{host = Host, port = Port, ets_tid = Tid} = _State) ->
catch ets:delete(ibrowse_lb, {Host, Port}),
stop_all_conn_procs(Tid),
ok.
stop_all_conn_procs(Tid) ->
ets:foldl(fun({{_, _, Pid}, _}, Acc) ->
ibrowse_http_client:stop(Pid),
Acc
end, [], Tid).
%%--------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
@ -234,30 +216,18 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
ets:safe_fixtable(Tid, true),
Res = find_best_connection(ets:first(Tid), Tid, Max_pipe),
ets:safe_fixtable(Tid, false),
Res.
find_best_connection('$end_of_table', _, _) ->
{error, retry_later};
find_best_connection(Pid, Tid, Max_pipe) ->
case ets:lookup(Tid, Pid) of
[{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe,
Speculative_sz < Max_pipe ->
case catch ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}) of
{'EXIT', _} ->
%% The selected process has shutdown
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe);
_ ->
{ok, Pid}
end;
_ ->
find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
case ets:first(Tid) of
{Spec_size, Ts, Pid} = First when Spec_size < Max_pipe ->
ets:delete(Tid, First),
ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
{ok, First};
_ ->
{error, retry_later}
end.
maybe_create_ets(#state{ets_tid = undefined} = State) ->
maybe_create_ets(#state{ets_tid = undefined, host = Host, port = Port} = State) ->
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}),
State#state{ets_tid = Tid};
maybe_create_ets(State) ->
State.

+ 7
- 3
src/ibrowse_lib.erl 查看文件

@ -28,7 +28,8 @@
get_value/2,
get_value/3,
parse_url/1,
printable_date/0
printable_date/0,
printable_date/1
]).
get_trace_status(Host, Port) ->
@ -368,8 +369,11 @@ default_port(https) -> 443;
default_port(ftp) -> 21.
printable_date() ->
{{Y,Mo,D},{H, M, S}} = calendar:local_time(),
{_,_,MicroSecs} = os:timestamp(),
printable_date(os:timestamp()).
printable_date(Now) ->
{{Y,Mo,D},{H, M, S}} = calendar:now_to_local_time(Now),
{_,_,MicroSecs} = Now,
[integer_to_list(Y),
$-,
integer_to_list(Mo),

+ 174
- 0
test/ibrowse_functional_tests.erl 查看文件

@ -0,0 +1,174 @@
%%% File : ibrowse_functional_tests.erl
%%% Authors : Benjamin Lee <http://github.com/benjaminplee>
%%% Dan Schwabe <http://github.com/dfschwabe>
%%% Brian Richards <http://github.com/richbria>
%%% Description : Functional tests of the ibrowse library using a live test HTTP server
%%% Created : 18 November 2014 by Benjamin Lee <yardspoon@gmail.com>
-module(ibrowse_functional_tests).
-include_lib("eunit/include/eunit.hrl").
-define(PER_TEST_TIMEOUT_SEC, 60).
-define(TIMEDTEST(Desc, Fun), {Desc, {timeout, ?PER_TEST_TIMEOUT_SEC, fun Fun/0}}).
-define(SERVER_PORT, 8181).
-define(BASE_URL, "http://localhost:" ++ integer_to_list(?SERVER_PORT)).
-define(SHORT_TIMEOUT_MS, 5000).
-define(LONG_TIMEOUT_MS, 30000).
-define(PAUSE_FOR_CONNECTIONS_MS, 2000).
-compile(export_all).
setup() ->
application:start(crypto),
application:start(public_key),
application:start(ssl),
ibrowse_test_server:start_server(?SERVER_PORT, tcp),
ibrowse:start(),
ok.
teardown(_) ->
ibrowse:stop(),
ibrowse_test_server:stop_server(?SERVER_PORT),
ok.
running_server_fixture_test_() ->
{foreach,
fun setup/0,
fun teardown/1,
[
?TIMEDTEST("Simple request can be honored", simple_request),
?TIMEDTEST("Slow server causes timeout", slow_server_timeout),
?TIMEDTEST("Pipeline depth goes down with responses", pipeline_depth),
?TIMEDTEST("Pipelines refill", pipeline_refill),
?TIMEDTEST("Timeout closes pipe", closing_pipes),
?TIMEDTEST("Requests are balanced over connections", balanced_connections),
?TIMEDTEST("Pipeline too small signals retries", small_pipeline),
?TIMEDTEST("Dest status can be gathered", status)
]
}.
simple_request() ->
?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [])).
slow_server_timeout() ->
?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)).
pipeline_depth() ->
MaxSessions = 2,
MaxPipeline = 2,
RequestsSent = 2,
EmptyPipelineDepth = 0,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, EmptyPipelineDepth), Counts).
pipeline_refill() ->
MaxSessions = 2,
MaxPipeline = 2,
RequestsToFill = MaxSessions * MaxPipeline,
%% Send off enough requests to fill sessions and pipelines in rappid succession
Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsToFill, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
% Verify that connections properly reported their completed responses and can still accept more
?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)),
% and do it again to make sure we really are clear
times(RequestsToFill, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
% Verify that connections properly reported their completed responses and can still accept more
?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)).
closing_pipes() ->
MaxSessions = 2,
MaxPipeline = 2,
RequestsSent = 2,
BalancedNumberOfRequestsPerConnection = 1,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts),
timer:sleep(?SHORT_TIMEOUT_MS),
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()).
balanced_connections() ->
MaxSessions = 4,
MaxPipeline = 100,
RequestsSent = 80,
BalancedNumberOfRequestsPerConnection = 20,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?LONG_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn_link(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
small_pipeline() ->
MaxSessions = 10,
MaxPipeline = 10,
RequestsSent = 100,
FullRequestsPerConnection = 10,
?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line
ibrowse:show_dest_status("localhost", 8181),
Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
?assertEqual(MaxSessions, length(Counts)),
?assertEqual(lists:duplicate(MaxSessions, FullRequestsPerConnection), Counts),
Response = ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS),
?assertEqual({error, retry_later}, Response).
status() ->
MaxSessions = 10,
MaxPipeline = 10,
RequestsSent = 100,
Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
times(RequestsSent, fun() -> spawn(Fun) end),
timer:sleep(?PAUSE_FOR_CONNECTIONS_MS), %% Wait for everyone to get in line
ibrowse:show_dest_status(),
ibrowse:show_dest_status("http://localhost:8181").
times(0, _) ->
ok;
times(X, Fun) ->
Fun(),
times(X - 1, Fun).

+ 181
- 0
test/ibrowse_load_test.erl 查看文件

@ -0,0 +1,181 @@
-module(ibrowse_load_test).
-compile(export_all).
-define(ibrowse_load_test_counters, ibrowse_load_test_counters).
start(Num_workers, Num_requests, Max_sess) ->
proc_lib:spawn(fun() ->
start_1(Num_workers, Num_requests, Max_sess)
end).
query_state() ->
ibrowse_load_test ! query_state.
shutdown() ->
ibrowse_load_test ! shutdown.
start_1(Num_workers, Num_requests, Max_sess) ->
register(ibrowse_load_test, self()),
application:start(ibrowse),
application:set_env(ibrowse, inactivity_timeout, 5000),
Ulimit = os:cmd("ulimit -n"),
case catch list_to_integer(string:strip(Ulimit, right, $\n)) of
X when is_integer(X), X > 3000 ->
ok;
X ->
io:format("Load test not starting. {insufficient_value_for_ulimit, ~p}~n", [X]),
exit({insufficient_value_for_ulimit, X})
end,
ets:new(?ibrowse_load_test_counters, [named_table, public]),
ets:new(ibrowse_load_timings, [named_table, public]),
try
ets:insert(?ibrowse_load_test_counters, [{success, 0},
{failed, 0},
{timeout, 0},
{retry_later, 0},
{one_request_only, 0}
]),
ibrowse:set_max_sessions("localhost", 8081, Max_sess),
Start_time = now(),
Workers = spawn_workers(Num_workers, Num_requests),
erlang:send_after(1000, self(), print_diagnostics),
ok = wait_for_workers(Workers),
End_time = now(),
Time_in_secs = trunc(round(timer:now_diff(End_time, Start_time) / 1000000)),
Req_count = Num_workers * Num_requests,
[{_, Success_count}] = ets:lookup(?ibrowse_load_test_counters, success),
case Success_count == Req_count of
true ->
io:format("Test success. All requests succeeded~n", []);
false when Success_count > 0 ->
io:format("Test failed. Some successes~n", []);
false ->
io:format("Test failed. ALL requests FAILED~n", [])
end,
case Time_in_secs > 0 of
true ->
io:format("Reqs/sec achieved : ~p~n", [trunc(round(Success_count / Time_in_secs))]);
false ->
ok
end,
io:format("Load test results:~n~p~n", [ets:tab2list(?ibrowse_load_test_counters)]),
io:format("Timings: ~p~n", [calculate_timings()])
catch Err ->
io:format("Err: ~p~n", [Err])
after
ets:delete(?ibrowse_load_test_counters),
ets:delete(ibrowse_load_timings),
unregister(ibrowse_load_test)
end.
calculate_timings() ->
{Max, Min, Mean} = get_mmv(ets:first(ibrowse_load_timings), {0, 9999999, 0}),
Variance = trunc(round(ets:foldl(fun({_, X}, X_acc) ->
(X - Mean)*(X-Mean) + X_acc
end, 0, ibrowse_load_timings) / ets:info(ibrowse_load_timings, size))),
Std_dev = trunc(round(math:sqrt(Variance))),
{ok, [{max, Max},
{min, Min},
{mean, Mean},
{variance, Variance},
{standard_deviation, Std_dev}]}.
get_mmv('$end_of_table', {Max, Min, Total}) ->
Mean = trunc(round(Total / ets:info(ibrowse_load_timings, size))),
{Max, Min, Mean};
get_mmv(Key, {Max, Min, Total}) ->
[{_, V}] = ets:lookup(ibrowse_load_timings, Key),
get_mmv(ets:next(ibrowse_load_timings, Key), {max(Max, V), min(Min, V), Total + V}).
spawn_workers(Num_w, Num_r) ->
spawn_workers(Num_w, Num_r, self(), []).
spawn_workers(0, _Num_requests, _Parent, Acc) ->
lists:reverse(Acc);
spawn_workers(Num_workers, Num_requests, Parent, Acc) ->
Pid_ref = spawn_monitor(fun() ->
random:seed(now()),
case catch worker_loop(Parent, Num_requests) of
{'EXIT', Rsn} ->
io:format("Worker crashed with reason: ~p~n", [Rsn]);
_ ->
ok
end
end),
spawn_workers(Num_workers - 1, Num_requests, Parent, [Pid_ref | Acc]).
wait_for_workers([]) ->
ok;
wait_for_workers([{Pid, Pid_ref} | T] = Pids) ->
receive
{done, Pid} ->
wait_for_workers(T);
{done, Some_pid} ->
wait_for_workers([{Pid, Pid_ref} | lists:keydelete(Some_pid, 1, T)]);
print_diagnostics ->
io:format("~1000.p~n", [ibrowse:get_metrics()]),
erlang:send_after(1000, self(), print_diagnostics),
wait_for_workers(Pids);
query_state ->
io:format("Waiting for ~p~n", [Pids]),
wait_for_workers(Pids);
shutdown ->
io:format("Shutting down on command. Still waiting for ~p workers~n", [length(Pids)]);
{'DOWN', _, process, _, normal} ->
wait_for_workers(Pids);
{'DOWN', _, process, Down_pid, Rsn} ->
io:format("Worker ~p died. Reason: ~p~n", [Down_pid, Rsn]),
wait_for_workers(lists:keydelete(Down_pid, 1, Pids));
X ->
io:format("Recvd unknown msg: ~p~n", [X]),
wait_for_workers(Pids)
end.
worker_loop(Parent, 0) ->
Parent ! {done, self()};
worker_loop(Parent, N) ->
Delay = random:uniform(100),
Url = case Delay rem 10 of
%% Change 10 to some number between 0-9 depending on how
%% much chaos you want to introduce into the server
%% side. The higher the number, the more often the
%% server will close a connection after serving the
%% first request, thereby forcing the client to
%% retry. Any number of 10 or higher will disable this
%% chaos mechanism
10 ->
ets:update_counter(?ibrowse_load_test_counters, one_request_only, 1),
"http://localhost:8081/ibrowse_handle_one_request_only";
_ ->
"http://localhost:8081/blah"
end,
Start_time = now(),
Res = ibrowse:send_req(Url, [], get),
End_time = now(),
Time_taken = trunc(round(timer:now_diff(End_time, Start_time) / 1000)),
ets:insert(ibrowse_load_timings, {now(), Time_taken}),
case Res of
{ok, "200", _, _} ->
ets:update_counter(?ibrowse_load_test_counters, success, 1);
{error, req_timedout} ->
ets:update_counter(?ibrowse_load_test_counters, timeout, 1);
{error, retry_later} ->
ets:update_counter(?ibrowse_load_test_counters, retry_later, 1);
{error, Reason} ->
update_unknown_counter(Reason, 1);
_ ->
io:format("~p -- Res: ~p~n", [self(), Res]),
ets:update_counter(?ibrowse_load_test_counters, failed, 1)
end,
timer:sleep(Delay),
worker_loop(Parent, N - 1).
update_unknown_counter(Counter, Inc_val) ->
case catch ets:update_counter(?ibrowse_load_test_counters, Counter, Inc_val) of
{'EXIT', _} ->
ets:insert_new(?ibrowse_load_test_counters, {Counter, 0}),
update_unknown_counter(Counter, Inc_val);
_ ->
ok
end.

+ 126
- 29
test/ibrowse_test.erl 查看文件

@ -8,9 +8,10 @@
load_test/3,
send_reqs_1/3,
do_send_req/2,
local_unit_tests/0,
unit_tests/0,
unit_tests/1,
unit_tests_1/2,
unit_tests/2,
unit_tests_1/3,
ue_test/0,
ue_test/1,
verify_chunked_streaming/0,
@ -34,9 +35,13 @@
test_303_response_with_a_body/1,
test_binary_headers/0,
test_binary_headers/1,
test_generate_body_0/0
test_generate_body_0/0,
test_retry_of_requests/0,
test_retry_of_requests/1
]).
-include_lib("ibrowse/include/ibrowse.hrl").
test_stream_once(Url, Method, Options) ->
test_stream_once(Url, Method, Options, 5000).
@ -90,6 +95,8 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) ->
ets:new(pid_table, [named_table, public]),
ets:new(ibrowse_test_results, [named_table, public]),
ets:new(ibrowse_errors, [named_table, public, ordered_set]),
ets:new(ibrowse_counter, [named_table, public, ordered_set]),
ets:insert(ibrowse_counter, {req_id, 1}),
init_results(),
process_flag(trap_exit, true),
log_msg("Starting spawning of workers...~n", []),
@ -207,6 +214,16 @@ dump_errors(Key, Iod) ->
%%------------------------------------------------------------------------------
%% Unit Tests
%%------------------------------------------------------------------------------
-define(LOCAL_TESTS, [
{local_test_fun, test_20122010, []},
{local_test_fun, test_pipeline_head_timeout, []},
{local_test_fun, test_head_transfer_encoding, []},
{local_test_fun, test_head_response_with_body, []},
{local_test_fun, test_303_response_with_a_body, []},
{local_test_fun, test_binary_headers, []},
{local_test_fun, test_retry_of_requests, []}
]).
-define(TEST_LIST, [{"http://intranet/messenger", get},
{"http://www.google.co.uk", get},
{"http://www.google.com", get},
@ -236,27 +253,27 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
{"http://jigsaw.w3.org/HTTP/CL/", get},
{"http://www.httpwatch.com/httpgallery/chunked/", get},
{"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
{local_test_fun, test_20122010, []},
{local_test_fun, test_pipeline_head_timeout, []},
{local_test_fun, test_head_transfer_encoding, []},
{local_test_fun, test_head_response_with_body, []},
{local_test_fun, test_303_response_with_a_body, []},
{local_test_fun, test_binary_headers, []}
]).
{"https://github.com", get, [{ssl_options, [{depth, 2}]}]}
] ++ ?LOCAL_TESTS).
local_unit_tests() ->
unit_tests([], ?LOCAL_TESTS).
unit_tests() ->
unit_tests([]).
error_logger:tty(false),
unit_tests([], ?TEST_LIST),
error_logger:tty(true).
unit_tests(Options) ->
unit_tests(Options, Test_list) ->
application:start(crypto),
application:start(asn1),
application:start(public_key),
application:start(ssl),
(catch ibrowse_test_server:start_server(8181, tcp)),
ibrowse:start(),
application:start(ibrowse),
Options_1 = Options ++ [{connect_timeout, 5000}],
Test_timeout = proplists:get_value(test_timeout, Options, 60000),
{Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
{Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1, Test_list]),
receive
{done, Pid} ->
ok;
@ -269,14 +286,14 @@ unit_tests(Options) ->
catch ibrowse_test_server:stop_server(8181),
ok.
unit_tests_1(Parent, Options) ->
unit_tests_1(Parent, Options, Test_list) ->
lists:foreach(fun({local_test_fun, Fun_name, Args}) ->
execute_req(local_test_fun, Fun_name, Args);
({Url, Method}) ->
execute_req(Url, Method, Options);
({Url, Method, X_Opts}) ->
execute_req(Url, Method, X_Opts ++ Options)
end, ?TEST_LIST),
end, Test_list),
Parent ! {done, self()}.
verify_chunked_streaming() ->
@ -371,6 +388,8 @@ wait_for_resp(Pid) ->
{'EXIT', Reason};
{'DOWN', _, _, _, _} ->
wait_for_resp(Pid);
{'EXIT', _, normal} ->
wait_for_resp(Pid);
Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid)
@ -425,6 +444,7 @@ maybe_stream_next(Req_id, Options) ->
end.
execute_req(local_test_fun, Method, Args) ->
reset_ibrowse(),
io:format(" ~-54.54w: ", [Method]),
Result = (catch apply(?MODULE, Method, Args)),
io:format("~p~n", [Result]);
@ -538,6 +558,74 @@ test_303_response_with_a_body(Url) ->
{test_failed, Res}
end.
%%------------------------------------------------------------------------------
%% Test that retry of requests happens correctly, and that ibrowse doesn't retry
%% if there is not enough time left
%%------------------------------------------------------------------------------
test_retry_of_requests() ->
clear_msg_q(),
test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay").
test_retry_of_requests(Url) ->
reset_ibrowse(),
Timeout_1 = 2050,
Res_1 = test_retry_of_requests(Url, Timeout_1),
case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
true;
(_) -> false
end, Res_1) of
[_|_] = X ->
Res_1_1 = Res_1 -- X,
case lists:all(
fun({_Pid, {error, retry_later}}) ->
true;
(_) ->
false
end, Res_1_1) of
true ->
ok;
false ->
exit({failed, Timeout_1, Res_1})
end;
_ ->
exit({failed, Timeout_1, Res_1})
end,
Timeout_2 = 2200,
Res_2 = test_retry_of_requests(Url, Timeout_2),
case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
true;
(_) -> false
end, Res_2) of
[_|_] = Res_2_X ->
Res_2_1 = Res_2 -- Res_2_X,
case lists:all(
fun({_Pid, {error, X_err_2}}) ->
(X_err_2 == retry_later) orelse (X_err_2 == req_timedout);
(_) ->
false
end, Res_2_1) of
true ->
ok;
false ->
exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
end;
_ ->
exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
end,
success.
test_retry_of_requests(Url, Timeout) ->
#url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
ibrowse:set_max_sessions(Host, Port, 1),
Parent = self(),
Pids = lists:map(fun(_) ->
spawn(fun() ->
Res = (catch ibrowse:send_req(Url, [], get, [], [], Timeout)),
Parent ! {self(), Res}
end)
end, lists:seq(1,10)),
accumulate_worker_resp(Pids).
%%------------------------------------------------------------------------------
%% Test what happens when the request at the head of a pipeline times out
%%------------------------------------------------------------------------------
@ -547,22 +635,27 @@ test_pipeline_head_timeout() ->
test_pipeline_head_timeout(Url) ->
{ok, Pid} = ibrowse:spawn_worker_process(Url),
Fixed_timeout = 2000,
Test_parent = self(),
Fun = fun({fixed, Timeout}) ->
spawn(fun() ->
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
end);
(Timeout_mult) ->
spawn(fun() ->
Timeout = 1000 + Timeout_mult*1000,
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
end)
end,
Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]],
X_pid = spawn(fun() ->
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
end),
%% io:format("Pid ~p with a fixed timeout~n", [X_pid]),
X_pid;
(Timeout_mult) ->
Timeout = Fixed_timeout + Timeout_mult*1000,
X_pid = spawn(fun() ->
do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
end),
%% io:format("Pid ~p with a timeout of ~p~n", [X_pid, Timeout]),
X_pid
end,
Pids = [Fun(X) || X <- [{fixed, Fixed_timeout} | lists:seq(1,10)]],
Result = accumulate_worker_resp(Pids),
case lists:all(fun({_, X_res}) ->
X_res == {error,req_timedout}
end, Result) of
(X_res == {error,req_timedout}) orelse (X_res == {error, connection_closed})
end, Result) of
true ->
success;
false ->
@ -725,3 +818,7 @@ do_trace(true, Fmt, Args) ->
io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]);
do_trace(_, _, _) ->
ok.
reset_ibrowse() ->
application:stop(ibrowse),
application:start(ibrowse).

+ 85
- 39
test/ibrowse_test_server.erl 查看文件

@ -6,40 +6,52 @@
-module(ibrowse_test_server).
-export([
start_server/2,
stop_server/1
stop_server/1,
get_conn_pipeline_depth/0
]).
-record(request, {method, uri, version, headers = [], body = []}).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
-define(ACCEPT_TIMEOUT_MS, 1000).
-define(CONN_PIPELINE_DEPTH, conn_pipeline_depth).
start_server(Port, Sock_type) ->
Fun = fun() ->
Name = server_proc_name(Port),
register(Name, self()),
case do_listen(Sock_type, Port, [{active, false},
{reuseaddr, true},
{nodelay, true},
{packet, http}]) of
{ok, Sock} ->
do_trace("Server listening on port: ~p~n", [Port]),
accept_loop(Sock, Sock_type);
Err ->
erlang:error(
lists:flatten(
io_lib:format(
"Failed to start server on port ~p. ~p~n",
[Port, Err]))),
exit({listen_error, Err})
end,
unregister(Name)
end,
Proc_name = server_proc_name(Port),
case whereis(Proc_name) of
undefined ->
register(Proc_name, self()),
ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]),
case do_listen(Sock_type, Port, [{active, false},
{reuseaddr, true},
{nodelay, true},
{packet, http}]) of
{ok, Sock} ->
do_trace("Server listening on port: ~p~n", [Port]),
accept_loop(Sock, Sock_type);
Err ->
erlang:error(
lists:flatten(
io_lib:format(
"Failed to start server on port ~p. ~p~n",
[Port, Err]))),
exit({listen_error, Err})
end;
_X ->
ok
end
end,
spawn_link(Fun).
stop_server(Port) ->
server_proc_name(Port) ! stop,
timer:sleep(2000), % wait for server to receive msg and unregister
ok.
get_conn_pipeline_depth() ->
ets:tab2list(?CONN_PIPELINE_DEPTH).
server_proc_name(Port) ->
list_to_atom("ibrowse_test_server_"++integer_to_list(Port)).
@ -51,24 +63,36 @@ do_listen(ssl, Port, Opts) ->
ssl:listen(Port, Opts).
do_accept(tcp, Listen_sock) ->
gen_tcp:accept(Listen_sock);
gen_tcp:accept(Listen_sock, ?ACCEPT_TIMEOUT_MS);
do_accept(ssl, Listen_sock) ->
ssl:ssl_accept(Listen_sock).
ssl:ssl_accept(Listen_sock, ?ACCEPT_TIMEOUT_MS).
accept_loop(Sock, Sock_type) ->
case do_accept(Sock_type, Sock) of
{ok, Conn} ->
Pid = spawn_link(
fun() ->
server_loop(Conn, Sock_type, #request{})
end),
Pid = spawn_link(fun() -> connection(Conn, Sock_type) end),
set_controlling_process(Conn, Sock_type, Pid),
Pid ! {setopts, [{active, true}]},
accept_loop(Sock, Sock_type);
{error, timeout} ->
receive
stop ->
ok
after 10 ->
accept_loop(Sock, Sock_type)
end;
Err ->
Err
end.
connection(Conn, Sock_type) ->
catch ets:insert(?CONN_PIPELINE_DEPTH, {self(), 0}),
try
server_loop(Conn, Sock_type, #request{})
after
catch ets:delete(?CONN_PIPELINE_DEPTH, self())
end.
set_controlling_process(Sock, tcp, Pid) ->
gen_tcp:controlling_process(Sock, Pid);
set_controlling_process(Sock, ssl, Pid) ->
@ -82,18 +106,26 @@ setopts(Sock, ssl, Opts) ->
server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
receive
{http, Sock, {http_request, HttpMethod, HttpUri, HttpVersion}} ->
catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1),
server_loop(Sock, Sock_type, Req#request{method = HttpMethod,
uri = HttpUri,
version = HttpVersion});
{http, Sock, {http_header, _, _, _, _} = H} ->
server_loop(Sock, Sock_type, Req#request{headers = [H | Headers]});
{http, Sock, http_eoh} ->
process_request(Sock, Sock_type, Req),
case process_request(Sock, Sock_type, Req) of
close_connection ->
gen_tcp:shutdown(Sock, read_write);
not_done ->
ok;
_ ->
catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1)
end,
server_loop(Sock, Sock_type, #request{});
{http, Sock, {http_error, Err}} ->
do_trace("Error parsing HTTP request:~n"
"Req so far : ~p~n"
"Err : ", [Req, Err]),
io:format("Error parsing HTTP request:~n"
"Req so far : ~p~n"
"Err : ~p", [Req, Err]),
exit({http_error, Err});
{setopts, Opts} ->
setopts(Sock, Sock_type, Opts),
@ -101,12 +133,10 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
{tcp_closed, Sock} ->
do_trace("Client closed connection~n", []),
ok;
stop ->
ok;
Other ->
do_trace("Recvd unknown msg: ~p~n", [Other]),
io:format("Recvd unknown msg: ~p~n", [Other]),
exit({unknown_msg, Other})
after 5000 ->
after 120000 ->
do_trace("Timing out client connection~n", []),
ok
end.
@ -145,7 +175,7 @@ process_request(Sock, Sock_type,
headers = _Headers,
uri = {abs_path, "/ibrowse_inac_timeout_test"}} = Req) ->
do_trace("Recvd req: ~p. Sleeping for 30 secs...~n", [Req]),
timer:sleep(30000),
timer:sleep(3000),
do_trace("...Sending response now.~n", []),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
@ -155,7 +185,6 @@ process_request(Sock, Sock_type,
uri = {abs_path, "/ibrowse_head_transfer_enc"}}) ->
Resp = <<"HTTP/1.1 400 Bad Request\r\nServer: Apache-Coyote/1.1\r\nContent-Length:5\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\n\r\nabcde">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type,
#request{method='GET',
headers = Headers,
@ -178,7 +207,7 @@ process_request(Sock, Sock_type,
#request{method='HEAD',
headers = _Headers,
uri = {abs_path, "/ibrowse_head_test"}}) ->
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r \nTransfer-Encoding: chunked\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\Date: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type,
#request{method='POST',
@ -192,17 +221,34 @@ process_request(Sock, Sock_type,
uri = {abs_path, "/ibrowse_303_with_body_test"}}) ->
Resp = <<"HTTP/1.1 303 See Other\r\nLocation: http://example.org\r\nContent-Length: 5\r\n\r\nabcde">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type,
#request{method='GET',
headers = _Headers,
uri = {abs_path, "/ibrowse_handle_one_request_only_with_delay"}}) ->
timer:sleep(2000),
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp),
close_connection;
process_request(Sock, Sock_type,
#request{method='GET',
headers = _Headers,
uri = {abs_path, "/ibrowse_handle_one_request_only"}}) ->
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp),
close_connection;
process_request(_Sock, _Sock_type, #request{uri = {abs_path, "/never_respond"} } ) ->
not_done;
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
do_send(Sock, Sock_type, Resp).
do_send(Sock, Sock_type, Resp),
timer:sleep(random:uniform(100)).
do_send(Sock, tcp, Resp) ->
gen_tcp:send(Sock, Resp);
do_send(Sock, ssl, Resp) ->
ssl:send(Sock, Resp).
%%------------------------------------------------------------------------------
%% Utility functions
%%------------------------------------------------------------------------------

Loading…
取消
儲存