Преглед изворни кода

Various changes. See README for details

pull/16/head semver
Chandrashekhar Mullaparthi пре 14 година
родитељ
комит
d756a2b0b6
7 измењених фајлова са 319 додато и 109 уклоњено
  1. +23
    -3
      README
  2. +8
    -3
      doc/ibrowse.html
  3. +66
    -13
      src/ibrowse.erl
  4. +155
    -81
      src/ibrowse_http_client.erl
  5. +22
    -1
      src/ibrowse_lb.erl
  6. +44
    -7
      src/ibrowse_test.erl
  7. +1
    -1
      vsn.mk

+ 23
- 3
README Прегледај датотеку

@ -18,12 +18,32 @@ ibrowse is available under two different licenses. LGPL or the BSD license.
Comments to : Chandrashekhar.Mullaparthi@gmail.com Comments to : Chandrashekhar.Mullaparthi@gmail.com
Version : 1.6.2
Version : 2.0.0
Latest version : git://github.com/cmullaparthi/ibrowse.git Latest version : git://github.com/cmullaparthi/ibrowse.git
CONTRIBUTIONS & CHANGE HISTORY CONTRIBUTIONS & CHANGE HISTORY
============================== ==============================
22-09-2010 - * Added option preserve_chunked_encoding. This allows the caller to get
the raw HTTP response when the Transfer-Encoding is Chunked. This feature
was requested by Benoit Chesneau who wanted to write a HTTP proxy using
ibrowse.
* Fixed bug with the {stream_to, {Pid, once}} option. Bug report and lot
of help from Filipe David Manana. Thank you Filipe.
* The {error, conn_failed} and {error, send_failed} return values are
now of the form {error, {conn_failed, Err}} and
{error, {send_failed, Err}}. This is so that the specific socket error
can be returned to the caller. I think it looks a bit ugly, but that
is the best compromise I could come up with.
* Added application configuration parameters default_max_sessions and
default_max_pipeline_size. These were previously hard coded to 10.
* Versioning of ibrowse now follows the Semantic Versioning principles.
See http://semver.org. Thanks to Anthony Molinaro for nudging me in
this direction.
* The connect_timeout option now only applies to the connection setup
phase. In previous versions, the time taken to setup the connection
was deducted from the specified timeout value for the request.
17-07-2010 - * Merged change made by Filipe David Manana to use the base64 17-07-2010 - * Merged change made by Filipe David Manana to use the base64
module for encoding/decoding. module for encoding/decoding.
@ -153,7 +173,7 @@ CONTRIBUTIONS & CHANGE HISTORY
12-01-2007 - Derek Upham sent in a bug fix. The reset_state function was not 12-01-2007 - Derek Upham sent in a bug fix. The reset_state function was not
behaving correctly when the transfer encoding was not chunked. behaving correctly when the transfer encoding was not chunked.
13-11-2006 - Youn�s Hafri reported a bug where ibrowse was not returning the
13-11-2006 - Youn�s Hafri reported a bug where ibrowse was not returning the
temporary filename when the server was closing the connection temporary filename when the server was closing the connection
after sending the data (as in HTTP/1.0). after sending the data (as in HTTP/1.0).
Released ibrowse under the BSD license Released ibrowse under the BSD license
@ -172,7 +192,7 @@ CONTRIBUTIONS & CHANGE HISTORY
22-Nov-2005 - Added ability to generate requests using the Chunked 22-Nov-2005 - Added ability to generate requests using the Chunked
Transfer-Encoding. Transfer-Encoding.
08-May-2005 - Youn�s Hafri made a CRUX LINUX port of ibrowse.
08-May-2005 - Youn�s Hafri made a CRUX LINUX port of ibrowse.
http://yhafri.club.fr/crux/index.html http://yhafri.club.fr/crux/index.html
Here are some usage examples. Enjoy! Here are some usage examples. Enjoy!

+ 8
- 3
doc/ibrowse.html Прегледај датотеку

@ -12,7 +12,7 @@
<ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client. <ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client.
<p>Copyright © 2005-2010 Chandrashekhar Mullaparthi</p> <p>Copyright © 2005-2010 Chandrashekhar Mullaparthi</p>
<p><b>Version:</b> 1.6.0</p>
<p><b>Version:</b> 2.0.0</p>
<p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p> <p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p>
<p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p> <p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p>
@ -202,7 +202,7 @@ send_req/4, send_req/5, send_req/6.

<div class="spec"> <div class="spec">
<p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -&gt; <a href="#type-response">response()</a></tt> <p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -&gt; <a href="#type-response">response()</a></tt>
<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li> <ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li>
<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, <a href="#type-boolean">boolean()</a>} | {give_raw_headers, <a href="#type-boolean">boolean()</a>}</tt></li>
<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, <a href="#type-boolean">boolean()</a>} | {give_raw_headers, <a href="#type-boolean">boolean()</a>} | {preserve_chunked_encoding, <a href="#type-boolean">boolean()</a>}</tt></li>
<li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li> <li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li>
<li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li> <li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li>
<li><tt><a name="type-username">username()</a> = string()</tt></li> <li><tt><a name="type-username">username()</a> = string()</tt></li>
@ -284,6 +284,11 @@ send_req/4, send_req/5, send_req/6.

caller to get access to the raw status line and raw unparsed caller to get access to the raw status line and raw unparsed
headers. Not quite sure why someone would want this, but one of my headers. Not quite sure why someone would want this, but one of my
users asked for it, so here it is. </li> users asked for it, so here it is. </li>
<li> The <code>preserve_chunked_encoding</code> option enables the caller
to receive the raw data stream when the Transfer-Encoding of the server
response is Chunked.
</li>
</ul> </ul>
</p> </p>
@ -441,6 +446,6 @@ send_req/4, send_req/5, send_req/6.

<hr> <hr>
<div class="navbar"><a name="#navbar_bottom"></a><table width="100%" border="0" cellspacing="0" cellpadding="2" summary="navigation bar"><tr><td><a href="overview-summary.html" target="overviewFrame">Overview</a></td><td><a href="http://www.erlang.org/"><img src="erlang.png" align="right" border="0" alt="erlang logo"></a></td></tr></table></div> <div class="navbar"><a name="#navbar_bottom"></a><table width="100%" border="0" cellspacing="0" cellpadding="2" summary="navigation bar"><tr><td><a href="overview-summary.html" target="overviewFrame">Overview</a></td><td><a href="http://www.erlang.org/"><img src="erlang.png" align="right" border="0" alt="erlang logo"></a></td></tr></table></div>
<p><i>Generated by EDoc, May 17 2010, 23:21:42.</i></p>
<p><i>Generated by EDoc, Sep 22 2010, 22:56:44.</i></p>
</body> </body>
</html> </html>

+ 66
- 13
src/ibrowse.erl Прегледај датотеку

@ -7,7 +7,7 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com> %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2010 Chandrashekhar Mullaparthi %% @copyright 2005-2010 Chandrashekhar Mullaparthi
%% @version 1.6.0
%% @version 2.0.0
%% @doc The ibrowse application implements an HTTP 1.1 client. This %% @doc The ibrowse application implements an HTTP 1.1 client. This
%% module implements the API of the HTTP client. There is one named %% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@ -236,6 +236,11 @@ send_req(Url, Headers, Method, Body) ->
%% caller to get access to the raw status line and raw unparsed %% caller to get access to the raw status line and raw unparsed
%% headers. Not quite sure why someone would want this, but one of my %% headers. Not quite sure why someone would want this, but one of my
%% users asked for it, so here it is. </li> %% users asked for it, so here it is. </li>
%%
%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
%% to receive the raw data stream when the Transfer-Encoding of the server
%% response is Chunked.
%% </li>
%% </ul> %% </ul>
%% %%
%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response() %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
@ -266,7 +271,8 @@ send_req(Url, Headers, Method, Body) ->
%% {socket_options, Sock_opts} | %% {socket_options, Sock_opts} |
%% {transfer_encoding, {chunked, ChunkSize}} | %% {transfer_encoding, {chunked, ChunkSize}} |
%% {headers_as_is, boolean()} | %% {headers_as_is, boolean()} |
%% {give_raw_headers, boolean()}
%% {give_raw_headers, boolean()} |
%% {preserve_chunked_encoding,boolean()}
%% %%
%% stream_to() = process() | {process(), once} %% stream_to() = process() | {process(), once}
%% process() = pid() | atom() %% process() = pid() | atom()
@ -302,23 +308,46 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Options_1 = merge_options(Host, Port, Options), Options_1 = merge_options(Host, Port, Options),
{SSLOptions, IsSSL} = {SSLOptions, IsSSL} =
case (Protocol == https) orelse case (Protocol == https) orelse
get_value(is_ssl, Options_1, false) of
get_value(is_ssl, Options_1, false) of
false -> {[], false}; false -> {[], false};
true -> {get_value(ssl_options, Options_1, []), true} true -> {get_value(ssl_options, Options_1, []), true}
end, end,
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, 0);
Err ->
{error, {url_parsing_failed, Err}}
end.
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions, Max_sessions,
Max_pipeline_size, Max_pipeline_size,
{SSLOptions, IsSSL}) of {SSLOptions, IsSSL}) of
{ok, Conn_Pid} ->
do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options_1, Timeout);
Err ->
Err
{ok, Conn_Pid} ->
case do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options_1, Timeout) of
{error, sel_conn_closed} ->
io:format("Selected connection closed. Trying again...~n", []),
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Try_count + 1);
Res ->
Res
end; end;
Err -> Err ->
{error, {url_parsing_failed, Err}}
end.
Err
end;
try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
{error, retry_later}.
merge_options(Host, Port, Options) -> merge_options(Host, Port, Options) ->
Config_options = get_config_value({options, Host, Port}, []), Config_options = get_config_value({options, Host, Port}, []),
@ -337,11 +366,27 @@ get_lb_pid(Url) ->
get_max_sessions(Host, Port, Options) -> get_max_sessions(Host, Port, Options) ->
get_value(max_sessions, Options, get_value(max_sessions, Options,
get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)).
get_config_value({max_sessions, Host, Port},
default_max_sessions())).
get_max_pipeline_size(Host, Port, Options) -> get_max_pipeline_size(Host, Port, Options) ->
get_value(max_pipeline_size, Options, get_value(max_pipeline_size, Options,
get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)).
get_config_value({max_pipeline_size, Host, Port},
default_max_pipeline_size())).
default_max_sessions() ->
safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
default_max_pipeline_size() ->
safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
safe_get_env(App, Key, Def_val) ->
case application:get_env(App, Key) of
undefined ->
Def_val;
{ok, Val} ->
Val
end.
%% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3 %% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
%% for achieving the same effect. %% for achieving the same effect.
@ -375,6 +420,10 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Options, Timeout) of Options, Timeout) of
{'EXIT', {timeout, _}} -> {'EXIT', {timeout, _}} ->
{error, req_timedout}; {error, req_timedout};
{'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
{error, sel_conn_closed};
{error, connection_closed} ->
{error, sel_conn_closed};
{'EXIT', Reason} -> {'EXIT', Reason} ->
{error, {'EXIT', Reason}}; {error, {'EXIT', Reason}};
{ok, St_code, Headers, Body} = Ret when is_binary(Body) -> {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
@ -684,6 +733,10 @@ handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
handle_call(stop, _From, State) -> handle_call(stop, _From, State) ->
do_trace("IBROWSE shutting down~n", []), do_trace("IBROWSE shutting down~n", []),
ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
ibrowse_lb:stop(Pid),
Acc
end, [], ibrowse_lb),
{stop, normal, ok, State}; {stop, normal, ok, State};
handle_call({set_config_value, Key, Val}, _From, State) -> handle_call({set_config_value, Key, Val}, _From, State) ->

+ 155
- 81
src/ibrowse_http_client.erl Прегледај датотеку

@ -47,7 +47,8 @@
status_line, raw_headers, status_line, raw_headers,
is_closing, send_timer, content_length, is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding, deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
chunk_size, chunk_size_buffer = <<>>,
recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}). }).
@ -57,7 +58,7 @@
req_id, req_id,
stream_chunk_size, stream_chunk_size,
save_response_to_file = false, save_response_to_file = false,
tmp_file_name, tmp_file_fd,
tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
response_format}). response_format}).
-import(ibrowse_lib, [ -import(ibrowse_lib, [
@ -82,8 +83,13 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []). gen_server:start_link(?MODULE, Args, []).
stop(Conn_pid) -> stop(Conn_pid) ->
catch gen_server:call(Conn_pid, stop),
ok.
case catch gen_server:call(Conn_pid, stop) of
{'EXIT', {timeout, _}} ->
exit(Conn_pid, kill),
ok;
_ ->
ok
end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call( gen_server:call(
@ -171,6 +177,7 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called)
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
%% io:format("Recvd data: ~p~n", [Data]),
do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State); handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) -> handle_info({ssl, _Sock, Data}, State) ->
@ -178,13 +185,14 @@ handle_info({ssl, _Sock, Data}, State) ->
handle_info({stream_next, Req_id}, #state{socket = Socket, handle_info({stream_next, Req_id}, #state{socket = Socket,
cur_req = #request{req_id = Req_id}} = State) -> cur_req = #request{req_id = Req_id}} = State) ->
%% io:format("Client process set {active, once}~n", []),
do_setopts(Socket, [{active, once}], State), do_setopts(Socket, [{active, once}], State),
{noreply, State}; {noreply, State};
handle_info({stream_next, _Req_id}, State) -> handle_info({stream_next, _Req_id}, State) ->
{noreply, State}; {noreply, State};
handle_info({tcp_closed, _Sock}, State) ->
handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []), do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State), handle_sock_closed(State),
{stop, normal, State}; {stop, normal, State};
@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) ->
{stop, normal, State}; {stop, normal, State};
handle_info({tcp_error, _Sock}, State) -> handle_info({tcp_error, _Sock}, State) ->
io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State), handle_sock_closed(State),
{stop, normal, State}; {stop, normal, State};
handle_info({ssl_error, _Sock}, State) -> handle_info({ssl_error, _Sock}, State) ->
io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State), handle_sock_closed(State),
{stop, normal, State}; {stop, normal, State};
@ -233,7 +241,8 @@ handle_info(Info, State) ->
%% Returns: any (ignored by gen_server) %% Returns: any (ignored by gen_server)
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
terminate(_Reason, State) -> terminate(_Reason, State) ->
do_close(State).
do_close(State),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Func: code_change/3 %% Func: code_change/3
@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = get_header}=State) ->
end; end;
handle_sock_data(Data, #state{status = get_body, handle_sock_data(Data, #state{status = get_body,
socket = Socket,
content_length = CL, content_length = CL,
http_status_code = StatCode, http_status_code = StatCode,
recvd_headers = Headers, recvd_headers = Headers,
@ -293,6 +303,21 @@ handle_sock_data(Data, #state{status = get_body,
fail_pipelined_requests(State, fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}), {error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State}; {stop, normal, State};
#state{cur_req = #request{caller_controls_socket = Ccs},
interim_reply_sent = Irs} = State_1 ->
%% io:format("Irs: ~p~n", [Irs]),
case Irs of
true ->
active_once(State_1);
false when Ccs == true ->
%% io:format("Setting {active,once}~n", []),
do_setopts(Socket, [{active, once}], State);
false ->
active_once(State_1)
end,
State_2 = State_1#state{interim_reply_sent = false},
set_inac_timer(State_2),
{noreply, State_2};
State_1 -> State_1 ->
active_once(State_1), active_once(State_1),
set_inac_timer(State_1), set_inac_timer(State_1),
@ -338,17 +363,25 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf
{error, Reason} -> {error, Reason} ->
{error, {file_write_error, Reason}} {error, {file_write_error, Reason}}
end; end;
accumulate_response(<<>>, State) ->
State;
accumulate_response(Data, #state{reply_buffer = RepBuf,
rep_buf_size = RepBufSize,
streamed_size = Streamed_size,
cur_req = CurReq}=State) ->
#request{stream_to=StreamTo, req_id=ReqId,
stream_chunk_size = Stream_chunk_size,
response_format = Response_format,
caller_controls_socket = Caller_controls_socket} = CurReq,
RepBuf_1 = list_to_binary([RepBuf, Data]),
%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs},
%% socket = Socket} = State) ->
%% case Ccs of
%% true ->
%% do_setopts(Socket, [{active, once}], State);
%% false ->
%% ok
%% end,
%% State;
accumulate_response(Data, #state{reply_buffer = RepBuf,
rep_buf_size = RepBufSize,
streamed_size = Streamed_size,
cur_req = CurReq}=State) ->
#request{stream_to = StreamTo,
req_id = ReqId,
stream_chunk_size = Stream_chunk_size,
response_format = Response_format,
caller_controls_socket = Caller_controls_socket} = CurReq,
RepBuf_1 = <<RepBuf/binary, Data/binary>>,
New_data_size = RepBufSize - Streamed_size, New_data_size = RepBufSize - Streamed_size,
case StreamTo of case StreamTo of
undefined -> undefined ->
@ -356,15 +389,21 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
_ when Caller_controls_socket == true -> _ when Caller_controls_socket == true ->
do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1), do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
State#state{reply_buffer = <<>>, State#state{reply_buffer = <<>>,
interim_reply_sent = true,
streamed_size = Streamed_size + size(RepBuf_1)}; streamed_size = Streamed_size + size(RepBuf_1)};
_ when New_data_size >= Stream_chunk_size -> _ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size), {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk), do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
accumulate_response(
Rem_data,
State#state{
reply_buffer = <<>>,
streamed_size = Streamed_size + Stream_chunk_size});
State_1 = State#state{
reply_buffer = <<>>,
interim_reply_sent = true,
streamed_size = Streamed_size + Stream_chunk_size},
case Rem_data of
<<>> ->
State_1;
_ ->
accumulate_response(Rem_data, State_1)
end;
_ -> _ ->
State#state{reply_buffer = RepBuf_1} State#state{reply_buffer = RepBuf_1}
end. end.
@ -498,9 +537,9 @@ do_close(#state{socket = Sock,
is_ssl = true, is_ssl = true,
use_proxy = true, use_proxy = true,
proxy_tunnel_setup = Pts proxy_tunnel_setup = Pts
}) when Pts /= done -> gen_tcp:close(Sock);
do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
}) when Pts /= done -> catch gen_tcp:close(Sock);
do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
active_once(#state{cur_req = #request{caller_controls_socket = true}}) -> active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
ok; ok;
@ -542,25 +581,17 @@ send_req_1(From,
end, end,
State_2 = check_ssl_options(Options, State_1), State_2 = check_ssl_options(Options, State_1),
do_trace("Connecting...~n", []), do_trace("Connecting...~n", []),
Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout), Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} -> {ok, Sock} ->
do_trace("Connected!~n", []),
End_ts = now(),
Timeout_1 = case Timeout of
infinity ->
infinity;
_ ->
Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
end,
do_trace("Connected! Socket: ~1000.p~n", [Sock]),
State_3 = State_2#state{socket = Sock, State_3 = State_2#state{socket = Sock,
connect_timeout = Conn_timeout}, connect_timeout = Conn_timeout},
send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err -> Err ->
shutting_down(State_2), shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]), do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
gen_server:reply(From, {error, conn_failed}),
gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2} {stop, normal, State_2}
end; end;
@ -580,8 +611,9 @@ send_req_1(From,
use_proxy = true, use_proxy = true,
is_ssl = true} = State) -> is_ssl = true} = State) ->
NewReq = #request{ NewReq = #request{
method = connect,
options = Options
method = connect,
preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
options = Options
}, },
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1), Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@ -611,13 +643,13 @@ send_req_1(From,
Err -> Err ->
shutting_down(State_1), shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]), do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1} {stop, normal, State_1}
end; end;
Err -> Err ->
shutting_down(State_1), shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]), do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1} {stop, normal, State_1}
end; end;
@ -666,7 +698,9 @@ send_req_1(From,
save_response_to_file = SaveResponseToFile, save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options), stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format, response_format = Resp_format,
from = From},
from = From,
preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
{Req, Body_1} = make_request(Method, {Req, Body_1} = make_request(Method,
@ -705,13 +739,13 @@ send_req_1(From,
Err -> Err ->
shutting_down(State_1), shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]), do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1} {stop, normal, State_1}
end; end;
Err -> Err ->
shutting_down(State_1), shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]), do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1} {stop, normal, State_1}
end. end.
@ -768,14 +802,14 @@ http_auth_digest(Username, Password) ->
ibrowse_lib:encode_base64(Username ++ [$: | Password]). ibrowse_lib:encode_base64(Username ++ [$: | Password]).
make_request(Method, Headers, AbsPath, RelPath, Body, Options, make_request(Method, Headers, AbsPath, RelPath, Body, Options,
#state{use_proxy = UseProxy}) ->
#state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})), HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Headers_1 = Headers_1 =
case get_value(content_length, Headers, false) of case get_value(content_length, Headers, false) of
false when (Body == []) or false when (Body == []) or
(Body == <<>>) or
is_tuple(Body) or
is_function(Body) ->
(Body == <<>>) or
is_tuple(Body) or
is_function(Body) ->
Headers; Headers;
false when is_binary(Body) -> false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers]; [{"content-length", integer_to_list(size(Body))} | Headers];
@ -799,7 +833,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options,
Headers_3 = cons_headers(Headers_2), Headers_3 = cons_headers(Headers_2),
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true -> true ->
AbsPath;
case Is_ssl of
true ->
RelPath;
false ->
AbsPath
end;
false -> false ->
RelPath RelPath
end, end,
@ -1017,7 +1056,7 @@ upgrade_to_ssl(#state{socket = Socket,
send_queued_requests(lists:reverse(Q), State_1); send_queued_requests(lists:reverse(Q), State_1);
Err -> Err ->
do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]), do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
do_error_reply(State, {error, send_failed}),
do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed} {error, send_failed}
end. end.
@ -1029,12 +1068,12 @@ send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q],
case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
{noreply, State_1} -> {noreply, State_1} ->
send_queued_requests(Q, State_1); send_queued_requests(Q, State_1);
_ ->
Err ->
do_trace("Error sending queued SSL request: ~n" do_trace("Error sending queued SSL request: ~n"
"URL : ~s~n" "URL : ~s~n"
"Method : ~p~n" "Method : ~p~n"
"Headers : ~p~n", [Url, Method, Headers]), "Headers : ~p~n", [Url, Method, Headers]),
do_error_reply(State, {error, send_failed}),
do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed} {error, send_failed}
end. end.
@ -1046,11 +1085,12 @@ is_connection_closing(_, _) -> false.
%% This clause determines the chunk size when given data from the beginning of the chunk %% This clause determines the chunk size when given data from the beginning of the chunk
parse_11_response(DataRecvd, parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, #state{transfer_encoding = chunked,
chunk_size = chunk_start,
chunk_size = chunk_start,
chunk_size_buffer = Chunk_sz_buf chunk_size_buffer = Chunk_sz_buf
} = State) -> } = State) ->
case scan_crlf(Chunk_sz_buf, DataRecvd) of case scan_crlf(Chunk_sz_buf, DataRecvd) of
{yes, ChunkHeader, Data_1} -> {yes, ChunkHeader, Data_1} ->
State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
ChunkSize = parse_chunk_header(ChunkHeader), ChunkSize = parse_chunk_header(ChunkHeader),
%% %%
%% Do we have to preserve the chunk encoding when %% Do we have to preserve the chunk encoding when
@ -1061,10 +1101,10 @@ parse_11_response(DataRecvd,
RemLen = size(Data_1), RemLen = size(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n", do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
[ChunkSize, RemLen]), [ChunkSize, RemLen]),
parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
deleted_crlf = true,
recvd_chunk_size = 0,
chunk_size = ChunkSize});
parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
deleted_crlf = true,
recvd_chunk_size = 0,
chunk_size = ChunkSize});
{no, Data_1} -> {no, Data_1} ->
State#state{chunk_size_buffer = Data_1} State#state{chunk_size_buffer = Data_1}
end; end;
@ -1074,13 +1114,15 @@ parse_11_response(DataRecvd,
parse_11_response(DataRecvd, parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, #state{transfer_encoding = chunked,
chunk_size = tbd, chunk_size = tbd,
chunk_size_buffer = Buf}=State) ->
chunk_size_buffer = Buf
} = State) ->
case scan_crlf(Buf, DataRecvd) of case scan_crlf(Buf, DataRecvd) of
{yes, _, NextChunk} -> {yes, _, NextChunk} ->
State_1 = State#state{chunk_size = chunk_start,
chunk_size_buffer = <<>>,
deleted_crlf = true},
parse_11_response(NextChunk, State_1);
State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
State_2 = State_1#state{chunk_size = chunk_start,
chunk_size_buffer = <<>>,
deleted_crlf = true},
parse_11_response(NextChunk, State_2);
{no, Data_1} -> {no, Data_1} ->
State#state{chunk_size_buffer = Data_1} State#state{chunk_size_buffer = Data_1}
end; end;
@ -1090,9 +1132,10 @@ parse_11_response(DataRecvd,
%% received is silently discarded. %% received is silently discarded.
parse_11_response(DataRecvd, parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, chunk_size = 0, #state{transfer_encoding = chunked, chunk_size = 0,
cur_req = CurReq,
deleted_crlf = DelCrlf,
chunk_size_buffer = Trailer, reqs = Reqs}=State) ->
cur_req = CurReq,
deleted_crlf = DelCrlf,
chunk_size_buffer = Trailer,
reqs = Reqs} = State) ->
do_trace("Detected end of chunked transfer...~n", []), do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of DataRecvd_1 = case DelCrlf of
false -> false ->
@ -1101,12 +1144,14 @@ parse_11_response(DataRecvd,
<<$\r, $\n, DataRecvd/binary>> <<$\r, $\n, DataRecvd/binary>>
end, end,
case scan_header(Trailer, DataRecvd_1) of case scan_header(Trailer, DataRecvd_1) of
{yes, _TEHeaders, Rem} ->
{yes, TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs), {_, Reqs_1} = queue:out(Reqs),
State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
parse_response(Rem, reset_state(State_1));
State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
State_2 = handle_response(CurReq,
State_1#state{reqs = Reqs_1}),
parse_response(Rem, reset_state(State_2));
{no, Rem} -> {no, Rem} ->
State#state{chunk_size_buffer = Rem, deleted_crlf = false}
accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
end; end;
%% This clause extracts a chunk, given the size. %% This clause extracts a chunk, given the size.
@ -1121,8 +1166,15 @@ parse_11_response(DataRecvd,
case DataLen >= NeedBytes of case DataLen >= NeedBytes of
true -> true ->
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
do_trace("Recvd another chunk...~n", []),
do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]), do_trace("RemData -> ~p~n", [RemData]),
case RemData of
<<>> ->
%% io:format("RemData -> ~p~n", [RemData]);
ok;
_ ->
ok
end,
case accumulate_response(RemChunk, State) of case accumulate_response(RemChunk, State) of
{error, Reason} -> {error, Reason} ->
do_trace("Error accumulating response --> ~p~n", [Reason]), do_trace("Error accumulating response --> ~p~n", [Reason]),
@ -1155,6 +1207,11 @@ parse_11_response(DataRecvd,
accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)}) accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end. end.
maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
State;
maybe_accumulate_ce_data(State, Data) ->
accumulate_response(Data, State).
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format, response_format = Resp_format,
save_response_to_file = SaveResponseToFile, save_response_to_file = SaveResponseToFile,
@ -1177,11 +1234,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ -> _ ->
{file, TmpFilename} {file, TmpFilename}
end, end,
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of Reply = case get_value(give_raw_headers, Options, false) of
true -> true ->
{ok, Status_line, Raw_headers, ResponseBody};
{ok, Status_line, Raw_headers_1, ResponseBody};
false -> false ->
{ok, SCode, RespHeaders, ResponseBody}
{ok, SCode, Resp_headers_1, ResponseBody}
end, end,
State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply), State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}), cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
@ -1192,16 +1250,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
#state{http_status_code = SCode, #state{http_status_code = SCode,
status_line = Status_line, status_line = Status_line,
raw_headers = Raw_headers, raw_headers = Raw_headers,
recvd_headers = RespHeaders,
recvd_headers = Resp_headers,
reply_buffer = RepBuf, reply_buffer = RepBuf,
send_timer = ReqTimer} = State) -> send_timer = ReqTimer} = State) ->
Body = RepBuf, Body = RepBuf,
%% State_1 = set_cur_request(State), %% State_1 = set_cur_request(State),
{Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of Reply = case get_value(give_raw_headers, Options, false) of
true -> true ->
{ok, Status_line, Raw_headers, Body};
{ok, Status_line, Raw_headers_1, Body};
false -> false ->
{ok, SCode, RespHeaders, Body}
{ok, SCode, Resp_headers_1, Body}
end, end,
State_1 = case get(conn_close) of State_1 = case get(conn_close) of
"close" -> "close" ->
@ -1227,7 +1286,8 @@ reset_state(State) ->
deleted_crlf = false, deleted_crlf = false,
http_status_code = undefined, http_status_code = undefined,
chunk_size = undefined, chunk_size = undefined,
transfer_encoding = undefined}.
transfer_encoding = undefined
}.
set_cur_request(#state{reqs = Reqs} = State) -> set_cur_request(#state{reqs = Reqs} = State) ->
case queue:to_list(Reqs) of case queue:to_list(Reqs) of
@ -1459,15 +1519,29 @@ send_async_headers(_ReqId, undefined, _, _State) ->
ok; ok;
send_async_headers(ReqId, StreamTo, Give_raw_headers, send_async_headers(ReqId, StreamTo, Give_raw_headers,
#state{status_line = Status_line, raw_headers = Raw_headers, #state{status_line = Status_line, raw_headers = Raw_headers,
recvd_headers = Headers, http_status_code = StatCode
}) ->
recvd_headers = Headers, http_status_code = StatCode,
cur_req = #request{options = Opts}
}) ->
{Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
case Give_raw_headers of case Give_raw_headers of
false -> false ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers};
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
true -> true ->
catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers}
catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end. end.
maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
Custom_headers = get_value(add_custom_headers, Opts, []),
Headers_1 = Headers ++ Custom_headers,
Raw_headers_1 = case Custom_headers of
[_ | _] when is_binary(Raw_headers) ->
Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
<<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
_ ->
Raw_headers
end,
{Headers_1, Raw_headers_1}.
format_response_data(Resp_format, Body) -> format_response_data(Resp_format, Body) ->
case Resp_format of case Resp_format of
list when is_list(Body) -> list when is_list(Body) ->

+ 22
- 1
src/ibrowse_lb.erl Прегледај датотеку

@ -16,7 +16,8 @@
%% External exports %% External exports
-export([ -export([
start_link/1, start_link/1,
spawn_connection/5
spawn_connection/5,
stop/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -85,6 +86,14 @@ spawn_connection(Lb_pid, Url,
is_integer(Max_sessions) -> is_integer(Max_sessions) ->
gen_server:call(Lb_pid, gen_server:call(Lb_pid,
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}). {spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
stop(Lb_pid) ->
case catch gen_server:call(Lb_pid, stop) of
{'EXIT', {timeout, _}} ->
exit(Lb_pid, kill);
ok ->
ok
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Function: handle_call/3 %% Function: handle_call/3
%% Description: Handling call messages %% Description: Handling call messages
@ -120,6 +129,18 @@ handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From,
ets:insert(Tid, {{1, Pid}, []}), ets:insert(Tid, {{1, Pid}, []}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}}; {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
gen_server:reply(_From, ok),
{stop, normal, State};
handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
ets:foldl(fun({{_, Pid}, _}, Acc) ->
ibrowse_http_client:stop(Pid),
Acc
end, [], Tid),
gen_server:reply(_From, ok),
{stop, normal, State};
handle_call(Request, _From, State) -> handle_call(Request, _From, State) ->
Reply = {unknown_request, Request}, Reply = {unknown_request, Request},
{reply, Reply, State}. {reply, Reply, State}.

+ 44
- 7
src/ibrowse_test.erl Прегледај датотеку

@ -17,6 +17,7 @@
ue_test/1, ue_test/1,
verify_chunked_streaming/0, verify_chunked_streaming/0,
verify_chunked_streaming/1, verify_chunked_streaming/1,
test_chunked_streaming_once/0,
i_do_async_req_list/4, i_do_async_req_list/4,
test_stream_once/3, test_stream_once/3,
test_stream_once/4 test_stream_once/4
@ -260,7 +261,20 @@ verify_chunked_streaming(Options) ->
io:format("Fetching data with streaming as binary...~n", []), io:format("Fetching data with streaming as binary...~n", []),
Async_response_bin = do_async_req_list( Async_response_bin = do_async_req_list(
Url, get, [{response_format, binary} | Options]), Url, get, [{response_format, binary} | Options]),
compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
io:format("Fetching data with streaming as binary, {active, once}...~n", []),
Async_response_bin_once = do_async_req_list(
Url, get, [once, {response_format, binary} | Options]),
compare_responses(Result_without_streaming, Async_response_list, Async_response_bin),
compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once).
test_chunked_streaming_once() ->
test_chunked_streaming_once([]).
test_chunked_streaming_once(Options) ->
Url = "http://www.httpwatch.com/httpgallery/chunked/",
io:format("URL: ~s~n", [Url]),
io:format("Fetching data with streaming as binary, {active, once}...~n", []),
do_async_req_list(Url, get, [once, {response_format, binary} | Options]).
compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) -> compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) ->
success; success;
@ -313,31 +327,54 @@ wait_for_resp(Pid) ->
Msg -> Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]), io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid) wait_for_resp(Pid)
after 10000 ->
after 100000 ->
{error, timeout} {error, timeout}
end. end.
i_do_async_req_list(Parent, Url, Method, Options) -> i_do_async_req_list(Parent, Url, Method, Options) ->
Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
Options_1 = case lists:member(once, Options) of
true ->
[{stream_to, {self(), once}} | (Options -- [once])];
false ->
[{stream_to, self()} | Options]
end,
Res = ibrowse:send_req(Url, [], Method, [], Options_1),
case Res of case Res of
{ibrowse_req_id, Req_id} -> {ibrowse_req_id, Req_id} ->
Result = wait_for_async_resp(Req_id, undefined, undefined, []),
Result = wait_for_async_resp(Req_id, Options, undefined, undefined, []),
Parent ! {async_result, self(), Result}; Parent ! {async_result, self(), Result};
Err -> Err ->
Parent ! {async_result, self(), Err} Parent ! {async_result, self(), Err}
end. end.
wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, Body) ->
receive receive
{ibrowse_async_headers, Req_id, StatCode, Headers} -> {ibrowse_async_headers, Req_id, StatCode, Headers} ->
wait_for_async_resp(Req_id, StatCode, Headers, Body);
%% io:format("Recvd headers...~n", []),
maybe_stream_next(Req_id, Options),
wait_for_async_resp(Req_id, Options, StatCode, Headers, Body);
{ibrowse_async_response_end, Req_id} -> {ibrowse_async_response_end, Req_id} ->
io:format("Recvd end of response.~n", []),
Body_1 = list_to_binary(lists:reverse(Body)), Body_1 = list_to_binary(lists:reverse(Body)),
{ok, Acc_Stat_code, Acc_Headers, Body_1}; {ok, Acc_Stat_code, Acc_Headers, Body_1};
{ibrowse_async_response, Req_id, Data} -> {ibrowse_async_response, Req_id, Data} ->
wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
maybe_stream_next(Req_id, Options),
%% io:format("Recvd data...~n", []),
wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, [Data | Body]);
{ibrowse_async_response, Req_id, {error, _} = Err} ->
{ok, Acc_Stat_code, Acc_Headers, Err};
Err -> Err ->
{ok, Acc_Stat_code, Acc_Headers, Err} {ok, Acc_Stat_code, Acc_Headers, Err}
after 10000 ->
{timeout, Acc_Stat_code, Acc_Headers, Body}
end.
maybe_stream_next(Req_id, Options) ->
case lists:member(once, Options) of
true ->
ibrowse:stream_next(Req_id);
false ->
ok
end. end.
execute_req(Url, Method, Options) -> execute_req(Url, Method, Options) ->

+ 1
- 1
vsn.mk Прегледај датотеку

@ -1,2 +1,2 @@
IBROWSE_VSN = 1.6.2
IBROWSE_VSN = 2.0.0

Loading…
Откажи
Сачувај