From cf294d5c8846b0a7b347c1eeaddc95fc08060706 Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Wed, 1 Jul 2009 23:49:02 +0100 Subject: [PATCH] ibrowse-1.5.0. See README for details --- README | 31 +- doc/ibrowse.html | 95 ++++-- ebin/ibrowse.app | 6 +- src/ibrowse.erl | 110 ++++++- src/ibrowse_http_client.erl | 620 ++++++++++++++++++++++-------------- src/ibrowse_lb.erl | 9 +- src/ibrowse_test.erl | 131 +++++++- vsn.mk | 2 +- 8 files changed, 688 insertions(+), 316 deletions(-) diff --git a/README b/README index b0e2a7e..9e2473a 100644 --- a/README +++ b/README @@ -1,5 +1,3 @@ -$Id: README,v 1.16 2008/05/21 15:28:11 chandrusf Exp $ - ibrowse is a HTTP client. The following are a list of features. - RFC2616 compliant (AFAIK) - supports GET, POST, OPTIONS, HEAD, PUT, DELETE, TRACE, @@ -18,10 +16,37 @@ ibrowse is a HTTP client. The following are a list of features. ibrowse is available under two different licenses. LGPL and the BSD license. -Comments to : Chandrashekhar.Mullaparthi@t-mobile.co.uk +Comments to : Chandrashekhar.Mullaparthi@gmail.com + +Version : 1.5.0 + +Latest version : git://github.com/cmullaparthi/ibrowse.git CONTRIBUTIONS & CHANGE HISTORY ============================== +29-06-2009 - * Fixed following issues reported by Oscar Hellström + - Use {active, once} instead of {active, true} + - Fix 'dodgy' timeout handling + - Use binaries internally instead of lists to reduce memory + consumption on 64 bit platforms. The default response format + is still 'list' to maintain backwards compatibility. Use the + option {response_format, binary} to get responses as binaries. + * Fixed chunking bug (reported by Adam Kocoloski) + * Added new option {inactivity_timeout, Milliseconds} to timeout + requests if no data is received on the link for the specified + interval. Useful when responses are large and links are flaky. + * Added ibrowse:all_trace_off/0 to turn off all tracing + * Change to the way responses to asynchronous requests are + returned. The following messages have been removed. + * {ibrowse_async_response, Req_id, {chunk_start, Chunk_size}} + * {ibrowse_async_response, Req_id, chunk_end} + * Fixed Makefiles as part of Debian packaging + (thanks to Thomas Lindgren) + * Moved repository from Sourceforge to Github + +11-06-2009 - * Added option to control size of streamed chunks. Also added + option for the client to receive responses in binary format. + 21-05-2008 - * Fixed bug in reading some options from the ibrowse.conf file. Reported by Erik Reitsma on the erlyaws mailing list * Fixed bug when cleaning up closing connections diff --git a/doc/ibrowse.html b/doc/ibrowse.html index ff286e0..172b24a 100644 --- a/doc/ibrowse.html +++ b/doc/ibrowse.html @@ -10,9 +10,9 @@

Module ibrowse

The ibrowse application implements an HTTP 1.1 client. -

Copyright © 2005-2008 Chandrashekhar Mullaparthi

+

Copyright © 2005-2009 Chandrashekhar Mullaparthi

-

Version: 1.4

+

Version: 1.5.0

Behaviours: gen_server.

Authors: Chandrashekhar Mullaparthi (chandrashekhar dot mullaparthi at gmail dot com).

@@ -64,7 +64,8 @@ send_req/4, send_req/5, send_req/6.

speed achieved using only erlang has been good enough, so the driver isn't actually used.

Function Index

- +
code_change/3
+ @@ -111,44 +112,49 @@ send_req/4, send_req/5, send_req/6.

Function Details

+

all_trace_off/0

+
+

all_trace_off() -> ok

+

Turn Off ALL tracing

+

code_change/3

-

code_change() -> term()

+

code_change(OldVsn, State, Extra) -> any()

get_config_value/1

-

get_config_value() -> term()

+

get_config_value(Key) -> any()

Internal export

get_config_value/2

-

get_config_value() -> term()

+

get_config_value(Key, DefVal) -> any()

Internal export

handle_call/3

-

handle_call() -> term()

+

handle_call(Request, From, State) -> any()

handle_cast/2

-

handle_cast() -> term()

+

handle_cast(Msg, State) -> any()

handle_info/2

-

handle_info() -> term()

+

handle_info(Info, State) -> any()

init/1

-

init() -> term()

+

init(X1) -> any()

rescan_config/0

-

rescan_config() -> term()

+

rescan_config() -> any()

Clear current configuration for ibrowse and load from the file ibrowse.conf in the IBROWSE_EBIN/../priv directory. Current configuration is cleared only if the ibrowse.conf file is readable @@ -156,7 +162,7 @@ send_req/4, send_req/5, send_req/6.

rescan_config/1

-

rescan_config() -> term()

+

rescan_config(File) -> any()

send_req/3

@@ -193,7 +199,7 @@ send_req/4, send_req/5, send_req/6.

send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()

Same as send_req/4. For a description of SSL Options, look in the ssl manpage. If the HTTP Version to use is not specified, the default is 1.1.
-

The host_header is useful in the case where ibrowse is +

The host_header option is useful in the case where ibrowse is connecting to a component such as stunnel which then sets up a secure connection to a webserver. In this case, the URL supplied to ibrowse must have the stunnel host/port details, but that won't @@ -223,7 +230,35 @@ send_req/4, send_req/5, send_req/6.

  • Whenever an error occurs in the processing of a request, ibrowse will return as much information as it has, such as HTTP Status Code and HTTP Headers. When this happens, the response is of the form {error, {Reason, {stat_code, StatusCode}, HTTP_headers}}
  • -

    + +
  • The inactivity_timeout option is useful when + dealing with large response bodies and/or slow links. In these + cases, it might be hard to estimate how long a request will take to + complete. In such cases, the client might want to timeout if no + data has been received on the link for a certain time interval.
  • + +
  • + The connect_timeout option is to specify how long the + client process should wait for connection establishment. This is + useful in scenarios where connections to servers are usually setup + very fast, but responses might take much longer compared to + connection setup. In such cases, it is better for the calling + process to timeout faster if there is a problem (DNS lookup + delays/failures, network routing issues, etc). The total timeout + value specified for the request will enforced. To illustrate using + an example: + + ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000). + + In the above invocation, if the connection isn't established within + 100 milliseconds, the request will fail with + {error, conn_failed}.
    + If connection setup succeeds, the total time allowed for the + request to complete will be 1000 milliseconds minus the time taken + for connection setup. +
  • + +

    send_req/6

    @@ -235,31 +270,31 @@ send_req/4, send_req/5, send_req/6.

    send_req_direct/4

    -

    send_req_direct() -> term()

    +

    send_req_direct(Conn_pid, Url, Headers, Method) -> any()

    Same as send_req/3 except that the first argument is the PID returned by spawn_worker_process/2 or spawn_link_worker_process/2

    send_req_direct/5

    -

    send_req_direct() -> term()

    +

    send_req_direct(Conn_pid, Url, Headers, Method, Body) -> any()

    Same as send_req/4 except that the first argument is the PID returned by spawn_worker_process/2 or spawn_link_worker_process/2

    send_req_direct/6

    -

    send_req_direct() -> term()

    +

    send_req_direct(Conn_pid, Url, Headers, Method, Body, Options) -> any()

    Same as send_req/5 except that the first argument is the PID returned by spawn_worker_process/2 or spawn_link_worker_process/2

    send_req_direct/7

    -

    send_req_direct() -> term()

    +

    send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) -> any()

    Same as send_req/6 except that the first argument is the PID returned by spawn_worker_process/2 or spawn_link_worker_process/2

    set_dest/3

    -

    set_dest() -> term()

    +

    set_dest(Host, Port, T) -> any()

    Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3 for achieving the same effect.

    @@ -275,7 +310,7 @@ send_req/4, send_req/5, send_req/6.

    show_dest_status/2

    -

    show_dest_status() -> term()

    +

    show_dest_status(Host, Port) -> any()

    Shows some internal information about load balancing to a specified Host:Port. Info about workers spawned using spawn_worker_process/2 or spawn_link_worker_process/2 is not @@ -283,7 +318,7 @@ send_req/4, send_req/5, send_req/6.

    spawn_link_worker_process/2

    -

    spawn_link_worker_process() -> term()

    +

    spawn_link_worker_process(Host, Port) -> any()

    Same as spawn_worker_process/2 except the the calling process is linked to the worker process which is spawned.

    @@ -305,7 +340,7 @@ send_req/4, send_req/5, send_req/6.

    start/0

    -

    start() -> term()

    +

    start() -> any()

    Starts the ibrowse process without linking. Useful when testing using the shell

    start_link/0

    @@ -315,7 +350,7 @@ send_req/4, send_req/5, send_req/6.

    stop/0

    -

    stop() -> term()

    +

    stop() -> any()

    Stop the ibrowse process. Useful when testing using the shell.

    stop_worker_process/1

    @@ -327,28 +362,28 @@ send_req/4, send_req/5, send_req/6.

    terminate/2

    -

    terminate() -> term()

    +

    terminate(Reason, State) -> any()

    trace_off/0

    -

    trace_off() -> term()

    +

    trace_off() -> any()

    Turn tracing off for the ibrowse process

    trace_off/2

    -

    trace_off(Host, Port) -> term()

    +

    trace_off(Host, Port) -> ok

    Turn tracing OFF for all connections to the specified HTTP server.

    trace_on/0

    -

    trace_on() -> term()

    +

    trace_on() -> any()

    Turn tracing on for the ibrowse process

    trace_on/2

    -

    trace_on(Host, Port) -> term() +

    trace_on(Host, Port) -> ok

    • Host = string()
    • Port = integer()

    @@ -357,6 +392,6 @@ send_req/4, send_req/5, send_req/6.


    all_trace_off/0Turn Off ALL tracing.
    code_change/3
    get_config_value/1Internal export.
    get_config_value/2Internal export.
    handle_call/3
    Overviewerlang logo
    -

    Generated by EDoc, Mar 27 2008, 01:20:55.

    +

    Generated by EDoc, Jun 30 2009, 23:44:01.

    diff --git a/ebin/ibrowse.app b/ebin/ibrowse.app index 16e0f15..5e4621d 100644 --- a/ebin/ibrowse.app +++ b/ebin/ibrowse.app @@ -1,12 +1,12 @@ {application, ibrowse, [{description, "HTTP client application"}, - {vsn, "1.4"}, + {vsn, "1.5.0"}, {modules, [ ibrowse, ibrowse_http_client, ibrowse_app, ibrowse_sup, - ibrowse_lb, - ibrowse_lib ]}, + ibrowse_lib, + ibrowse_lb ]}, {registered, []}, {applications, [kernel,stdlib,sasl]}, {env, []}, diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 4e6404a..1b0daad 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -6,8 +6,8 @@ %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi %%%------------------------------------------------------------------- %% @author Chandrashekhar Mullaparthi -%% @copyright 2005-2008 Chandrashekhar Mullaparthi -%% @version 1.4 +%% @copyright 2005-2009 Chandrashekhar Mullaparthi +%% @version 1.5.0 %% @doc The ibrowse application implements an HTTP 1.1 client. This %% module implements the API of the HTTP client. There is one named %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is @@ -57,7 +57,7 @@ %% driver isn't actually used.

    -module(ibrowse). --vsn('$Id: ibrowse.erl,v 1.7 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse.erl,v 1.8 2009/07/01 22:43:19 chandrusf Exp $ '). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -96,6 +96,7 @@ trace_off/0, trace_on/2, trace_off/2, + all_trace_off/0, show_dest_status/2 ]). @@ -105,8 +106,6 @@ -import(ibrowse_lib, [ parse_url/1, - printable_date/0, - get_value/2, get_value/3, do_trace/2 ]). @@ -114,6 +113,7 @@ -record(state, {trace = false}). -include("ibrowse.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -define(DEF_MAX_SESSIONS,10). -define(DEF_MAX_PIPELINE_SIZE,10). @@ -170,7 +170,7 @@ send_req(Url, Headers, Method, Body) -> %% For a description of SSL Options, look in the ssl manpage. If the %% HTTP Version to use is not specified, the default is 1.1. %%
    -%%

    The host_header is useful in the case where ibrowse is +%%

    The host_header option is useful in the case where ibrowse is %% connecting to a component such as stunnel which then sets up a %% secure connection to a webserver. In this case, the URL supplied to @@ -188,10 +188,40 @@ send_req(Url, Headers, Method, Body) -> %%

  • Whenever an error occurs in the processing of a request, ibrowse will return as much %% information as it has, such as HTTP Status Code and HTTP Headers. When this happens, the response %% is of the form {error, {Reason, {stat_code, StatusCode}, HTTP_headers}}
  • +%% +%%
  • The inactivity_timeout option is useful when +%% dealing with large response bodies and/or slow links. In these +%% cases, it might be hard to estimate how long a request will take to +%% complete. In such cases, the client might want to timeout if no +%% data has been received on the link for a certain time interval.
  • +%% +%%
  • +%% The connect_timeout option is to specify how long the +%% client process should wait for connection establishment. This is +%% useful in scenarios where connections to servers are usually setup +%% very fast, but responses might take much longer compared to +%% connection setup. In such cases, it is better for the calling +%% process to timeout faster if there is a problem (DNS lookup +%% delays/failures, network routing issues, etc). The total timeout +%% value specified for the request will enforced. To illustrate using +%% an example: +%% +%% ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000). +%% +%% In the above invocation, if the connection isn't established within +%% 100 milliseconds, the request will fail with +%% {error, conn_failed}.
    +%% If connection setup succeeds, the total time allowed for the +%% request to complete will be 1000 milliseconds minus the time taken +%% for connection setup. +%%
  • %% +%% %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response() %% optionList() = [option()] %% option() = {max_sessions, integer()} | +%% {response_format,response_format()}| +%% {stream_chunk_size, integer()} | %% {max_pipeline_size, integer()} | %% {trace, boolean()} | %% {is_ssl, boolean()} | @@ -210,8 +240,10 @@ send_req(Url, Headers, Method, Body) -> %% {stream_to, process()} | %% {http_vsn, {MajorVsn, MinorVsn}} | %% {host_header, string()} | +%% {inactivity_timeout, integer()} | +%% {connect_timeout, integer()} | %% {transfer_encoding, {chunked, ChunkSize}} -%% +%% %% process() = pid() | atom() %% username() = string() %% password() = string() @@ -219,7 +251,7 @@ send_req(Url, Headers, Method, Body) -> %% ChunkSize = integer() %% srtf() = boolean() | filename() %% filename() = string() -%% +%% response_format() = list | binary send_req(Url, Headers, Method, Body, Options) -> send_req(Url, Headers, Method, Body, Options, 30000). @@ -230,7 +262,8 @@ send_req(Url, Headers, Method, Body, Options) -> send_req(Url, Headers, Method, Body, Options, Timeout) -> case catch parse_url(Url) of #url{host = Host, - port = Port} = Parsed_url -> + port = Port, + protocol = Protocol} = Parsed_url -> Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of [] -> get_lb_pid(Parsed_url); @@ -241,9 +274,10 @@ send_req(Url, Headers, Method, Body, Options, Timeout) -> Max_pipeline_size = get_max_pipeline_size(Host, Port, Options), Options_1 = merge_options(Host, Port, Options), {SSLOptions, IsSSL} = - case get_value(is_ssl, Options_1, false) of + case (Protocol == https) orelse + get_value(is_ssl, Options_1, false) of false -> {[], false}; - true -> {get_value(ssl_options, Options_1), true} + true -> {get_value(ssl_options, Options_1, []), true} end, case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url, Max_sessions, @@ -310,16 +344,28 @@ set_max_pipeline_size(Host, Port, Max) when is_integer(Max), Max > 0 -> do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> case catch ibrowse_http_client:send_req(Conn_Pid, Parsed_url, - Headers, Method, Body, + Headers, Method, ensure_bin(Body), Options, Timeout) of {'EXIT', {timeout, _}} -> {error, req_timedout}; {'EXIT', Reason} -> {error, {'EXIT', Reason}}; + {ok, St_code, Headers, Body} = Ret when is_binary(Body) -> + case get_value(response_format, Options, list) of + list -> + {ok, St_code, Headers, binary_to_list(Body)}; + binary -> + Ret + end; Ret -> Ret end. +ensure_bin(L) when is_list(L) -> + list_to_binary(L); +ensure_bin(B) when is_binary(B) -> + B. + %% @doc Creates a HTTP client process to the specified Host:Port which %% is not part of the load balancing pool. This is useful in cases %% where some requests to a webserver might take a long time whereas @@ -389,17 +435,25 @@ trace_off() -> %% @doc Turn tracing on for all connections to the specified HTTP %% server. Host is whatever is specified as the domain name in the URL -%% @spec trace_on(Host, Port) -> term() +%% @spec trace_on(Host, Port) -> ok %% Host = string() %% Port = integer() trace_on(Host, Port) -> - ibrowse ! {trace, true, Host, Port}. + ibrowse ! {trace, true, Host, Port}, + ok. %% @doc Turn tracing OFF for all connections to the specified HTTP %% server. -%% @spec trace_off(Host, Port) -> term() +%% @spec trace_off(Host, Port) -> ok trace_off(Host, Port) -> - ibrowse ! {trace, false, Host, Port}. + ibrowse ! {trace, false, Host, Port}, + ok. + +%% @doc Turn Off ALL tracing +%% @spec all_trace_off() -> ok +all_trace_off() -> + ibrowse ! all_trace_off, + ok. %% @doc Shows some internal information about load balancing to a %% specified Host:Port. Info about workers spawned using @@ -577,6 +631,30 @@ handle_cast(_Msg, State) -> %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- +handle_info(all_trace_off, State) -> + Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}], + Trace_on_dests = ets:select(ibrowse_conf, Mspec), + Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) -> + case lists:member({H, P}, Trace_on_dests) of + false -> + ok; + true -> + catch Pid ! {trace, false} + end; + (#client_conn{key = {H, P, Pid}}, _) -> + case lists:member({H, P}, Trace_on_dests) of + false -> + ok; + true -> + catch Pid ! {trace, false} + end; + (_, Acc) -> + Acc + end, + ets:foldl(Fun, undefined, ibrowse_lb), + ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]), + {noreply, State}; + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 9a0e4d3..24214ff 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -6,7 +6,7 @@ %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi %%%------------------------------------------------------------------- -module(ibrowse_http_client). --vsn('$Id: ibrowse_http_client.erl,v 1.18 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse_http_client.erl,v 1.19 2009/07/01 22:43:19 chandrusf Exp $ '). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -38,28 +38,33 @@ -include("ibrowse.hrl"). --record(state, {host, port, +-record(state, {host, port, use_proxy = false, proxy_auth_digest, - ssl_options = [], is_ssl = false, socket, - reqs=queue:new(), cur_req, status=idle, http_status_code, - reply_buffer=[], rep_buf_size=0, recvd_headers=[], + ssl_options = [], is_ssl = false, socket, + reqs=queue:new(), cur_req, status=idle, http_status_code, + reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0, + recvd_headers=[], is_closing, send_timer, content_length, - deleted_crlf = false, transfer_encoding, chunk_size, - chunks=[], lb_ets_tid, cur_pipeline_size = 0}). + deleted_crlf = false, transfer_encoding, + chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, + lb_ets_tid, cur_pipeline_size = 0 + }). -record(request, {url, method, options, from, stream_to, req_id, - save_response_to_file = false, - tmp_file_name, tmp_file_fd}). + stream_chunk_size, + save_response_to_file = false, + tmp_file_name, tmp_file_fd, + response_format}). -import(ibrowse_lib, [ - parse_url/1, - printable_date/0, get_value/2, get_value/3, do_trace/2 ]). +-define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024). + %%==================================================================== %% External functions %%==================================================================== @@ -77,15 +82,9 @@ stop(Conn_pid) -> gen_server:call(Conn_pid, stop). send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> - Timeout_1 = case Timeout of - infinity -> - infinity; - _ when is_integer(Timeout) -> - Timeout + 100 - end, gen_server:call( Conn_Pid, - {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout_1). + {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout). %%==================================================================== %% Server functions @@ -127,15 +126,16 @@ init({Host, Port}) -> %%-------------------------------------------------------------------- %% Received a request when the remote server has already sent us a %% Connection: Close header -handle_call({send_req, _}, +handle_call({send_req, _}, _From, #state{is_closing=true}=State) -> {reply, {error, connection_closing}, State}; -handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, +handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, From, #state{socket=undefined, host=Host, port=Port}=State) -> + Resp_format = get_value(response_format, Options, list), {Host_1, Port_1, State_1} = case get_value(proxy_host, Options, false) of false -> @@ -151,33 +151,41 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, StreamTo = get_value(stream_to, Options, undefined), ReqId = make_req_id(), SaveResponseToFile = get_value(save_response_to_file, Options, false), - NewReq = #request{url=Url, + NewReq = #request{url=Url, method=Method, stream_to=StreamTo, - options=Options, + options=Options, req_id=ReqId, save_response_to_file = SaveResponseToFile, + stream_chunk_size = get_stream_chunk_size(Options), + response_format = Resp_format, from=From}, Reqs = queue:in(NewReq, State#state.reqs), State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}), do_trace("Connecting...~n", []), - Timeout_1 = case Timeout of - infinity -> - infinity; - _ -> - round(Timeout*0.9) - end, - case do_connect(Host_1, Port_1, Options, State_2, Timeout_1) of + Start_ts = now(), + Conn_timeout = get_value(connect_timeout, Options, Timeout), + case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of {ok, Sock} -> + do_trace("Connected!~n", []), + End_ts = now(), Ref = case Timeout of infinity -> undefined; _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) + Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)), + case Rem_time > 0 of + true -> + erlang:send_after(Rem_time, self(), {req_timedout, From}); + false -> + shutting_down(State_2), + do_error_reply(State_2, req_timedout), + exit(normal) + end end, - do_trace("Connected!~n", []), case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of ok -> + do_setopts(Sock, [{active, once}], State_2#state.is_ssl), case StreamTo of undefined -> ok; @@ -188,7 +196,7 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, send_timer = Ref, cur_req = NewReq, status = get_header}), - {noreply, State_3}; + {noreply, State_3, get_inac_timeout(State_3)}; Err -> shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -208,21 +216,24 @@ handle_call({send_req, {Url, Headers, Method, From, #state{socket=Sock, status=Status, reqs=Reqs}=State) -> do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]), + Resp_format = get_value(response_format, Options, list), StreamTo = get_value(stream_to, Options, undefined), SaveResponseToFile = get_value(save_response_to_file, Options, false), ReqId = make_req_id(), - NewReq = #request{url=Url, + NewReq = #request{url=Url, stream_to=StreamTo, method=Method, - options=Options, + options=Options, req_id=ReqId, save_response_to_file = SaveResponseToFile, + stream_chunk_size = get_stream_chunk_size(Options), + response_format = Resp_format, from=From}, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of ok -> State_2 = inc_pipeline_counter(State_1), - do_setopts(Sock, [{active, true}], State#state.is_ssl), + do_setopts(Sock, [{active, once}], State#state.is_ssl), case Timeout of infinity -> ok; @@ -242,7 +253,7 @@ handle_call({send_req, {Url, Headers, Method, _ -> gen_server:reply(From, {ibrowse_req_id, ReqId}) end, - {noreply, State_3}; + {noreply, State_3, get_inac_timeout(State_3)}; Err -> shutting_down(State_1), do_trace("Send request failed: Reason: ~p~n", [Err]), @@ -277,7 +288,8 @@ handle_cast(_Msg, State) -> %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_info({tcp, _Sock, Data}, State) -> +handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> + do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> handle_sock_data(Data, State); @@ -293,14 +305,19 @@ handle_info({ssl_closed, _Sock}, State) -> handle_info({req_timedout, From}, State) -> case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of - false -> - {noreply, State}; - {value, _} -> - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State} + false -> + {noreply, State}; + {value, _} -> + shutting_down(State), + do_error_reply(State, req_timedout), + {stop, normal, State} end; +handle_info(timeout, State) -> + shutting_down(State), + do_error_reply(State, req_timedout), + {stop, normal, State}; + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; @@ -353,51 +370,48 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) -> shutting_down(State), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end; handle_sock_data(Data, #state{status=get_body, content_length=CL, http_status_code = StatCode, - recvd_headers=Headers, + recvd_headers=Headers, chunk_size=CSz, socket=Sock}=State) -> case (CL == undefined) and (CSz == undefined) of true -> case accumulate_response(Data, State) of {error, Reason} -> shutting_down(State), - fail_pipelined_requests(State, + fail_pipelined_requests(State, {error, {Reason, {stat_code, StatCode}, Headers}}), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end; _ -> case parse_11_response(Data, State) of {error, Reason} -> shutting_down(State), - fail_pipelined_requests(State, + fail_pipelined_requests(State, {error, {Reason, {stat_code, StatCode}, Headers}}), {stop, normal, State}; stop -> shutting_down(State), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end end. accumulate_response(Data, #state{ - cur_req = #request{save_response_to_file = SaveResponseToFile, + cur_req = #request{save_response_to_file = true, tmp_file_fd = undefined} = CurReq, - http_status_code=[$2 | _]}=State) when SaveResponseToFile /= false -> - TmpFilename = case SaveResponseToFile of - true -> make_tmp_filename(); - F -> F - end, + http_status_code=[$2 | _]}=State) -> + TmpFilename = make_tmp_filename(), case file:open(TmpFilename, [write, delayed_write, raw]) of {ok, Fd} -> accumulate_response(Data, State#state{ @@ -407,40 +421,53 @@ accumulate_response(Data, {error, Reason} -> {error, {file_open_error, Reason}} end; -accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile, +accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true, tmp_file_fd = Fd}, transfer_encoding=chunked, - chunks = Chunks, + reply_buffer = Reply_buf, http_status_code=[$2 | _] - } = State) when SaveResponseToFile /= false -> - case file:write(Fd, [Chunks | Data]) of + } = State) -> + case file:write(Fd, [Reply_buf, Data]) of ok -> - State#state{chunks = []}; + State#state{reply_buffer = <<>>}; {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile, +accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true, tmp_file_fd = Fd}, reply_buffer = RepBuf, http_status_code=[$2 | _] - } = State) when SaveResponseToFile /= false -> - case file:write(Fd, [RepBuf | Data]) of + } = State) -> + case file:write(Fd, [RepBuf, Data]) of ok -> - State#state{reply_buffer = []}; + State#state{reply_buffer = <<>>}; {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response([], State) -> +accumulate_response(<<>>, State) -> State; accumulate_response(Data, #state{reply_buffer = RepBuf, + rep_buf_size = RepBufSize, + streamed_size = Streamed_size, cur_req = CurReq}=State) -> - #request{stream_to=StreamTo, req_id=ReqId} = CurReq, + #request{stream_to=StreamTo, req_id=ReqId, + stream_chunk_size = Stream_chunk_size, + response_format = Response_format} = CurReq, + RepBuf_1 = concat_binary([RepBuf, Data]), + New_data_size = RepBufSize - Streamed_size, case StreamTo of undefined -> - State#state{reply_buffer = [Data | RepBuf]}; + State#state{reply_buffer = RepBuf_1}; + _ when New_data_size < Stream_chunk_size -> + State#state{reply_buffer = RepBuf_1}; _ -> - do_interim_reply(StreamTo, ReqId, Data), - State + {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size), + do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk), + accumulate_response( + Rem_data, + State#state{ + reply_buffer = <<>>, + streamed_size = Streamed_size + Stream_chunk_size}) end. make_tmp_filename() -> @@ -463,26 +490,26 @@ handle_sock_closed(#state{status=get_header}=State) -> handle_sock_closed(#state{cur_req=undefined} = State) -> shutting_down(State); -%% We check for IsClosing because this the server could have sent a +%% We check for IsClosing because this the server could have sent a %% Connection-Close header and has closed the socket to indicate end %% of response. There maybe requests pipelined which need a response. -handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC, - is_closing=IsClosing, - cur_req=#request{tmp_file_name=TmpFilename, - tmp_file_fd=Fd} = CurReq, - status=get_body, recvd_headers=Headers}=State) -> - #request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq, +handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC, + is_closing = IsClosing, + cur_req = #request{tmp_file_name=TmpFilename, + tmp_file_fd=Fd} = CurReq, + status = get_body, recvd_headers = Headers}=State) -> + #request{from=From, stream_to=StreamTo, req_id=ReqId, + response_format = Resp_format} = CurReq, case IsClosing of true -> {_, Reqs_1} = queue:out(Reqs), case TmpFilename of undefined -> - do_reply(State, From, StreamTo, ReqId, - {ok, SC, Headers, - lists:flatten(lists:reverse(Buf))}); + do_reply(State, From, StreamTo, ReqId, Resp_format, + {ok, SC, Headers, lists:reverse(Buf)}); _ -> file:close(Fd), - do_reply(State, From, StreamTo, ReqId, + do_reply(State, From, StreamTo, ReqId, Resp_format, {ok, SC, Headers, {file, TmpFilename}}) end, do_error_reply(State#state{reqs = Reqs_1}, connection_closed), @@ -493,9 +520,13 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC, end. do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) -> - ssl:connect(Host, Port, [{nodelay, true}, {active, false} | SSLOptions], Timeout); + ssl:connect(Host, Port, + [binary, {nodelay, true}, {active, false} | SSLOptions], + Timeout); do_connect(Host, Port, _Options, _State, Timeout) -> - gen_tcp:connect(Host, Port, [{nodelay, true}, {active, false}], Timeout). + gen_tcp:connect(Host, Port, + [binary, {nodelay, true}, {active, false}], + Timeout). do_send(Sock, Req, true) -> ssl:send(Sock, Req); do_send(Sock, Req, false) -> gen_tcp:send(Sock, Req). @@ -542,7 +573,7 @@ check_ssl_options(Options, State) -> send_req_1(#url{abspath = AbsPath, host = Host, - port = Port, + port = Port, path = RelPath} = Url, Headers, Method, Body, Options, Sock, State) -> Headers_1 = add_auth_headers(Url, Options, Headers, State), @@ -555,10 +586,10 @@ send_req_1(#url{abspath = AbsPath, {value, {_, Host_h_val}} -> Host_h_val end, - {Req, Body_1} = make_request(Method, + {Req, Body_1} = make_request(Method, [{"Host", HostHeaderValue} | Headers_1], AbsPath, RelPath, Body, Options, State#state.use_proxy), - case get(my_trace_flag) of + case get(my_trace_flag) of true -> %%Avoid the binary operations if trace is not on... NReq = binary_to_list(list_to_binary(Req)), @@ -569,17 +600,17 @@ send_req_1(#url{abspath = AbsPath, end, SndRes = case do_send(Sock, Req, State#state.is_ssl) of ok -> do_send_body(Sock, Body_1, State#state.is_ssl); - Err -> + Err -> io:format("Err: ~p~n", [Err]), Err end, - do_setopts(Sock, [{active, true}], State#state.is_ssl), + do_setopts(Sock, [{active, once}], State#state.is_ssl), SndRes. add_auth_headers(#url{username = User, - password = UPw}, + password = UPw}, Options, - Headers, + Headers, #state{use_proxy = UseProxy, proxy_auth_digest = ProxyAuthDigest}) -> Headers_1 = case User of @@ -601,7 +632,7 @@ add_auth_headers(#url{username = User, true -> [{"Proxy-Authorization", ["Basic ", ProxyAuthDigest]} | Headers_1] end. - + http_auth_digest([], []) -> []; http_auth_digest(Username, Password) -> @@ -617,7 +648,7 @@ encode_base64([A,B,C|Ls]) -> encode_base64_do(A,B,C, Ls). encode_base64_do(A,B,C, Rest) -> BB = (A bsl 16) bor (B bsl 8) bor C, - [e(BB bsr 18), e((BB bsr 12) band 63), + [e(BB bsr 18), e((BB bsr 12) band 63), e((BB bsr 6) band 63), e(BB band 63)|encode_base64(Rest)]. e(X) when X >= 0, X < 26 -> X+65; @@ -643,12 +674,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) -> _ -> Headers end, - {Headers_2, Body_1} = + {Headers_2, Body_1} = case get_value(transfer_encoding, Options, false) of false -> {Headers_1, Body}; {chunked, ChunkSize} -> - {[{X, Y} || {X, Y} <- Headers_1, + {[{X, Y} || {X, Y} <- Headers_1, X /= "Content-Length", X /= "content-length", X /= content_length] ++ @@ -659,7 +690,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) -> Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of true -> AbsPath; - false -> + false -> RelPath end, {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}. @@ -729,12 +760,12 @@ chunk_request_body(Body, _ChunkSize, Acc) when list(Body) -> parse_response(_Data, #state{cur_req = undefined}=State) -> State#state{status = idle}; -parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, - cur_req=CurReq}=State) -> +parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, + cur_req = CurReq} = State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, - method=Method} = CurReq, + method=Method, response_format = Resp_format} = CurReq, MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity), - case scan_header(Data, Acc) of + case scan_header(Acc, Data) of {yes, Headers, Data_1} -> do_trace("Recvd Header Data -> ~s~n----~n", [Headers]), do_trace("Recvd headers~n--- Headers Begin ---~n~s~n--- Headers End ---~n~n", [Headers]), @@ -749,7 +780,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, false -> ok end, - State_1 = State#state{recvd_headers=Headers_1, status=get_body, + State_1 = State#state{recvd_headers=Headers_1, status=get_body, + reply_buffer = <<>>, http_status_code=StatCode, is_closing=IsClosing}, put(conn_close, ConnClose), TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")), @@ -757,7 +789,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, _ when Method == head -> {_, Reqs_1} = queue:out(Reqs), send_async_headers(ReqId, StreamTo, StatCode, Headers_1), - State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}), + State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, + {ok, StatCode, Headers_1, []}), cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}), State_2 = reset_state(State_1_1), State_3 = set_cur_request(State_2#state{reqs = Reqs_1}), @@ -776,7 +809,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, %% RFC2616 - Sec 4.4 {_, Reqs_1} = queue:out(Reqs), send_async_headers(ReqId, StreamTo, StatCode, Headers_1), - State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}), + State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, + {ok, StatCode, Headers_1, []}), cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}), State_2 = reset_state(State_1_1), State_3 = set_cur_request(State_2#state{reqs = Reqs_1}), @@ -786,9 +820,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, send_async_headers(ReqId, StreamTo, StatCode, Headers_1), case parse_11_response(Data_1, State_1#state{transfer_encoding=chunked, chunk_size=chunk_start, - reply_buffer=[], chunks=[]}) of + reply_buffer = <<>>}) of {error, Reason} -> - fail_pipelined_requests(State_1, + fail_pipelined_requests(State_1, {error, {Reason, {stat_code, StatCode}, Headers_1}}), {error, Reason}; @@ -798,9 +832,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, undefined when HttpVsn == "HTTP/1.0"; ConnClose == "close" -> send_async_headers(ReqId, StreamTo, StatCode, Headers_1), - State_1#state{reply_buffer=[Data_1]}; + State_1#state{reply_buffer = Data_1}; undefined -> - fail_pipelined_requests(State_1, + fail_pipelined_requests(State_1, {error, {content_length_undefined, {stat_code, StatCode}, Headers}}), {error, content_length_undefined}; @@ -810,11 +844,11 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, send_async_headers(ReqId, StreamTo, StatCode, Headers_1), do_trace("Recvd Content-Length of ~p~n", [V_1]), State_2 = State_1#state{rep_buf_size=0, - reply_buffer=[], + reply_buffer = <<>>, content_length=V_1}, case parse_11_response(Data_1, State_2) of {error, Reason} -> - fail_pipelined_requests(State_1, + fail_pipelined_requests(State_1, {error, {Reason, {stat_code, StatCode}, Headers_1}}), {error, Reason}; @@ -822,16 +856,16 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, State_3 end; _ -> - fail_pipelined_requests(State_1, + fail_pipelined_requests(State_1, {error, {content_length_undefined, {stat_code, StatCode}, Headers}}), {error, content_length_undefined} end end; {no, Acc_1} when MaxHeaderSize == infinity -> - State#state{reply_buffer=Acc_1}; - {no, Acc_1} when length(Acc_1) < MaxHeaderSize -> - State#state{reply_buffer=Acc_1}; + State#state{reply_buffer = Acc_1}; + {no, Acc_1} when size(Acc_1) < MaxHeaderSize -> + State#state{reply_buffer = Acc_1}; {no, _Acc_1} -> fail_pipelined_requests(State, {error, max_headers_size_exceeded}), {error, max_headers_size_exceeded} @@ -843,153 +877,131 @@ is_connection_closing("HTTP/1.0", "false") -> true; is_connection_closing(_, _) -> false. %% This clause determines the chunk size when given data from the beginning of the chunk -parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, +parse_11_response(DataRecvd, + #state{transfer_encoding=chunked, chunk_size=chunk_start, - cur_req=CurReq, - reply_buffer=Buf}=State) -> - case scan_crlf(DataRecvd, Buf) of + chunk_size_buffer = Chunk_sz_buf + } = State) -> + case scan_crlf(Chunk_sz_buf, DataRecvd) of {yes, ChunkHeader, Data_1} -> case parse_chunk_header(ChunkHeader) of {error, Reason} -> {error, Reason}; ChunkSize -> - #request{stream_to=StreamTo, req_id=ReqId} = CurReq, %% - %% Do we have to preserve the chunk encoding when streaming? + %% Do we have to preserve the chunk encoding when + %% streaming? NO. This should be transparent to the client + %% process. Chunked encoding was only introduced to make + %% it efficient for the server. %% - do_interim_reply(StreamTo, ReqId, {chunk_start, ChunkSize}), - RemLen = length(Data_1), + RemLen = size(Data_1), do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]), - parse_11_response(Data_1, State#state{rep_buf_size=0, - reply_buffer=[], - deleted_crlf=true, - chunk_size=ChunkSize}) + parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>, + deleted_crlf = true, + recvd_chunk_size = 0, + chunk_size = ChunkSize}) end; {no, Data_1} -> - State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)} + State#state{chunk_size_buffer = Data_1} end; -%% This clause is there to remove the CRLF between two chunks -%% -parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, - chunk_size=tbd, - chunks = Chunks, - cur_req=CurReq, - reply_buffer=Buf}=State) -> - case scan_crlf(DataRecvd, Buf) of +%% This clause is to remove the CRLF between two chunks +%% +parse_11_response(DataRecvd, + #state{transfer_encoding = chunked, + chunk_size = tbd, + chunk_size_buffer = Buf}=State) -> + case scan_crlf(Buf, DataRecvd) of {yes, _, NextChunk} -> - #request{stream_to=StreamTo, req_id=ReqId} = CurReq, - %% - %% Do we have to preserve the chunk encoding when streaming? - %% - State_1 = State#state{chunk_size=chunk_start, - rep_buf_size=0, - reply_buffer=[], - deleted_crlf=true}, - State_2 = case StreamTo of - undefined -> - State_1#state{chunks = [Buf | Chunks]}; - _ -> - do_interim_reply(StreamTo, ReqId, chunk_end), - State_1 - end, - parse_11_response(NextChunk, State_2); + State_1 = State#state{chunk_size = chunk_start, + chunk_size_buffer = <<>>, +%% reply_buffer = Buf_1, + deleted_crlf = true}, + parse_11_response(NextChunk, State_1); {no, Data_1} -> - State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)} +%% State#state{reply_buffer = Data_1, rep_buf_size = size(Data_1)} + State#state{chunk_size_buffer = Data_1} end; %% This clause deals with the end of a chunked transfer -parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, chunk_size=0, - cur_req=CurReq, +parse_11_response(DataRecvd, + #state{transfer_encoding = chunked, chunk_size = 0, + cur_req = CurReq, deleted_crlf = DelCrlf, - reply_buffer=Trailer, reqs=Reqs}=State) -> + reply_buffer = Trailer, reqs = Reqs}=State) -> do_trace("Detected end of chunked transfer...~n", []), DataRecvd_1 = case DelCrlf of - false -> + false -> DataRecvd; true -> - [$\r, $\n | DataRecvd] - end, - #request{stream_to=StreamTo, req_id=ReqId} = CurReq, - case scan_header(DataRecvd_1, Trailer) of + <<$\r, $\n, DataRecvd/binary>> + end, + case scan_header(Trailer, DataRecvd_1) of {yes, _TEHeaders, Rem} -> {_, Reqs_1} = queue:out(Reqs), - %% - %% Do we have to preserve the chunk encoding when streaming? - %% - do_interim_reply(StreamTo, ReqId, chunk_end), - State_1 = handle_response(CurReq, State#state{reqs=Reqs_1}), + State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}), parse_response(Rem, reset_state(State_1)); {no, Rem} -> - State#state{reply_buffer=Rem, rep_buf_size=length(Rem), deleted_crlf=false} + State#state{reply_buffer = Rem, rep_buf_size = size(Rem), deleted_crlf = false} end; %% This clause extracts a chunk, given the size. -parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, chunk_size=CSz, - rep_buf_size=RepBufSz}=State) -> - NeedBytes = CSz - RepBufSz, - DataLen = length(DataRecvd), +parse_11_response(DataRecvd, + #state{transfer_encoding = chunked, + chunk_size = CSz, + recvd_chunk_size = Recvd_csz, + rep_buf_size = RepBufSz} = State) -> + NeedBytes = CSz - Recvd_csz, + DataLen = size(DataRecvd), do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]), case DataLen >= NeedBytes of true -> - {RemChunk, RemData} = split_list_at(DataRecvd, NeedBytes), + {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), do_trace("Recvd another chunk...~n", []), do_trace("RemData -> ~p~n", [RemData]), case accumulate_response(RemChunk, State) of {error, Reason} -> do_trace("Error accumulating response --> ~p~n", [Reason]), {error, Reason}; - #state{reply_buffer = NewRepBuf, - chunks = NewChunks} = State_1 -> - State_2 = State_1#state{reply_buffer=[], - chunks = [lists:reverse(NewRepBuf) | NewChunks], - rep_buf_size=0, - chunk_size=tbd}, + #state{} = State_1 -> + State_2 = State_1#state{chunk_size=tbd}, parse_11_response(RemData, State_2) end; false -> - accumulate_response(DataRecvd, State#state{rep_buf_size=RepBufSz + DataLen}) + accumulate_response(DataRecvd, + State#state{rep_buf_size = RepBufSz + DataLen, + recvd_chunk_size = Recvd_csz + DataLen}) end; %% This clause to extract the body when Content-Length is specified -parse_11_response(DataRecvd, - #state{content_length=CL, rep_buf_size=RepBufSz, +parse_11_response(DataRecvd, + #state{content_length=CL, rep_buf_size=RepBufSz, reqs=Reqs}=State) -> NeedBytes = CL - RepBufSz, - DataLen = length(DataRecvd), + DataLen = size(DataRecvd), case DataLen >= NeedBytes of true -> - {RemBody, Rem} = split_list_at(DataRecvd, NeedBytes), + {RemBody, Rem} = split_binary(DataRecvd, NeedBytes), {_, Reqs_1} = queue:out(Reqs), State_1 = accumulate_response(RemBody, State), State_2 = handle_response(State_1#state.cur_req, State_1#state{reqs=Reqs_1}), State_3 = reset_state(State_2), parse_response(Rem, State_3); false -> - accumulate_response(DataRecvd, State#state{rep_buf_size=RepBufSz+DataLen}) + accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)}) end. handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, - save_response_to_file = SaveResponseToFile, + response_format = Resp_format, + save_response_to_file = SaveResponseToFile, tmp_file_name = TmpFilename, tmp_file_fd = Fd }, #state{http_status_code = SCode, send_timer = ReqTimer, reply_buffer = RepBuf, - transfer_encoding = TEnc, - chunks = Chunks, recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false -> - Body = case TEnc of - chunked -> - lists:flatten(lists:reverse(Chunks)); - _ -> - lists:flatten(lists:reverse(RepBuf)) - end, + Body = RepBuf, State_1 = set_cur_request(State), file:close(Fd), ResponseBody = case TmpFilename of @@ -998,34 +1010,42 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, _ -> {file, TmpFilename} end, - State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, ResponseBody}), + State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, + {ok, SCode, RespHeaders, ResponseBody}), cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}), State_2; -handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId}, +handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, + response_format = Resp_format}, #state{http_status_code=SCode, recvd_headers=RespHeaders, - reply_buffer=RepBuf, transfer_encoding=TEnc, - chunks=Chunks, send_timer=ReqTimer}=State) -> - Body = case TEnc of - chunked -> - lists:flatten(lists:reverse(Chunks)); - _ -> - lists:flatten(lists:reverse(RepBuf)) - end, - State_1 = set_cur_request(State), - case get(conn_close) of + reply_buffer = RepBuf, + send_timer=ReqTimer}=State) -> + Body = RepBuf, +%% State_1 = set_cur_request(State), + State_1 = case get(conn_close) of "close" -> - do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}), + do_reply(State, From, StreamTo, ReqId, Resp_format, + {ok, SCode, RespHeaders, Body}), exit(normal); _ -> - State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}), + State_1_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, + {ok, SCode, RespHeaders, Body}), cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}), - State_2 - end. + State_1_1 + end, + set_cur_request(State_1). reset_state(State) -> - State#state{status=get_header, rep_buf_size=0,content_length=undefined, - reply_buffer=[], chunks=[], recvd_headers=[], deleted_crlf=false, - http_status_code=undefined, chunk_size=undefined, transfer_encoding=undefined}. + State#state{status = get_header, + rep_buf_size = 0, + streamed_size = 0, + content_length = undefined, + reply_buffer = <<>>, + chunk_size_buffer = <<>>, + recvd_headers = [], + deleted_crlf = false, + http_status_code = undefined, + chunk_size = undefined, + transfer_encoding = undefined}. set_cur_request(#state{reqs = Reqs} = State) -> case queue:to_list(Reqs) of @@ -1036,7 +1056,7 @@ set_cur_request(#state{reqs = Reqs} = State) -> end. parse_headers(Headers) -> - case scan_crlf(Headers, []) of + case scan_crlf(Headers) of {yes, StatusLine, T} -> Headers_1 = parse_headers_1(T), case parse_status_line(StatusLine) of @@ -1059,22 +1079,24 @@ parse_headers(Headers) -> % SP. A recipient MAY replace any linear white space with a single % SP before interpreting the field value or forwarding the message % downstream. +parse_headers_1(B) when is_binary(B) -> + parse_headers_1(binary_to_list(B)); parse_headers_1(String) -> parse_headers_1(String, [], []). parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32; - H == $\t -> + H == $\t -> parse_headers_1(lists:dropwhile(fun(X) -> is_whitespace(X) end, T), [32 | L], Acc); -parse_headers_1([$\n|T], [$\r | L], Acc) -> +parse_headers_1([$\n|T], [$\r | L], Acc) -> case parse_header(lists:reverse(L)) of invalid -> parse_headers_1(T, [], Acc); NewHeader -> parse_headers_1(T, [], [NewHeader | Acc]) end; -parse_headers_1([H|T], L, Acc) -> +parse_headers_1([H|T], L, Acc) -> parse_headers_1(T, [H|L], Acc); parse_headers_1([], [], Acc) -> lists:reverse(Acc); @@ -1087,6 +1109,8 @@ parse_headers_1([], L, Acc) -> end, lists:reverse(Acc_1). +parse_status_line(Line) when is_binary(Line) -> + parse_status_line(binary_to_list(Line)); parse_status_line(Line) -> parse_status_line(Line, get_prot_vsn, [], []). parse_status_line([32 | T], get_prot_vsn, ProtVsn, StatCode) -> @@ -1100,6 +1124,8 @@ parse_status_line([H | T], get_status_code, ProtVsn, StatCode) -> parse_status_line([], _, _, _) -> http_09. +parse_header(B) when is_binary(B) -> + parse_header(binary_to_list(B)); parse_header(L) -> parse_header(L, []). parse_header([$: | V], Acc) -> @@ -1109,13 +1135,75 @@ parse_header([H | T], Acc) -> parse_header([], _) -> invalid. -scan_header([$\n|T], [$\r,$\n,$\r|L]) -> {yes, lists:reverse([$\n,$\r| L]), T}; -scan_header([H|T], L) -> scan_header(T, [H|L]); -scan_header([], L) -> {no, L}. +scan_header(Bin) -> + case get_crlf_crlf_pos(Bin, 0) of + {yes, Pos} -> + {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos), + {yes, Headers, Body}; + no -> + {no, Bin} + end. + +scan_header(Bin1, Bin2) when size(Bin1) < 4 -> + scan_header(<>); +scan_header(Bin1, <<>>) -> + scan_header(Bin1); +scan_header(Bin1, Bin2) -> + Bin1_already_scanned_size = size(Bin1) - 4, + <> = Bin1, + Bin_to_scan = <>, + case get_crlf_crlf_pos(Bin_to_scan, 0) of + {yes, Pos} -> + {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), + {yes, <>, Body}; + no -> + {no, <>} + end. + +get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1); +get_crlf_crlf_pos(<<>>, _) -> no. + +scan_crlf(Bin) -> + case get_crlf_pos(Bin) of + {yes, Pos} -> + {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos), + {yes, Prefix, Suffix}; + no -> + {no, Bin} + end. + +scan_crlf(<<>>, Bin2) -> + scan_crlf(Bin2); +scan_crlf(Bin1, Bin2) when size(Bin1) < 2 -> + scan_crlf(<>); +scan_crlf(Bin1, Bin2) -> + scan_crlf_1(size(Bin1) - 2, Bin1, Bin2). + +scan_crlf_1(Bin1_head_size, Bin1, Bin2) -> + <> = Bin1, + Bin3 = <>, + case get_crlf_pos(Bin3) of + {yes, Pos} -> + {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos), + {yes, concat_binary([Bin1_head, Prefix]), Suffix}; + no -> + {no, concat_binary([Bin1, Bin2])} + end. + +get_crlf_pos(Bin) -> + get_crlf_pos(Bin, 0). + +get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1); +get_crlf_pos(<<>>, _) -> no. -scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T}; -scan_crlf([H|T], L) -> scan_crlf(T, [H|L]); -scan_crlf([], L) -> {no, L}. +%% scan_crlf(<<$\n, T/binary>>, [$\r | L]) -> {yes, lists:reverse(L), T}; +%% scan_crlf(<>, L) -> scan_crlf(T, [H|L]); +%% scan_crlf(<<>>, L) -> {no, L}; +%% scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T}; +%% scan_crlf([H|T], L) -> scan_crlf(T, [H|L]); +%% scan_crlf([], L) -> {no, L}. fmt_val(L) when list(L) -> L; fmt_val(I) when integer(I) -> integer_to_list(I); @@ -1173,19 +1261,19 @@ parse_chunk_header([]) -> parse_chunk_header(ChunkHeader) -> parse_chunk_header(ChunkHeader, []). -parse_chunk_header([$; | _], Acc) -> +parse_chunk_header(<<$;, _/binary>>, Acc) -> hexlist_to_integer(lists:reverse(Acc)); -parse_chunk_header([H | T], Acc) -> +parse_chunk_header(<>, Acc) -> case is_whitespace(H) of true -> parse_chunk_header(T, Acc); false -> parse_chunk_header(T, [H | Acc]) end; -parse_chunk_header([], Acc) -> +parse_chunk_header(<<>>, Acc) -> hexlist_to_integer(lists:reverse(Acc)). -is_whitespace(32) -> true; +is_whitespace($\s) -> true; is_whitespace($\r) -> true; is_whitespace($\n) -> true; is_whitespace($\t) -> true; @@ -1197,36 +1285,64 @@ send_async_headers(_ReqId, undefined, _StatCode, _Headers) -> send_async_headers(ReqId, StreamTo, StatCode, Headers) -> catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers}. -do_reply(State, From, undefined, _, Msg) -> +format_response_data(Resp_format, Body) -> + case Resp_format of + list when is_list(Body) -> + flatten(Body); + list when is_binary(Body) -> + binary_to_list(Body); + binary when is_list(Body) -> + list_to_binary(Body); + _ -> + %% This is to cater for sending messages such as + %% {chunk_start, _}, chunk_end etc + Body + end. + +do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) -> + Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)}, + gen_server:reply(From, Msg_1), + dec_pipeline_counter(State); +do_reply(State, From, undefined, _, _, Msg) -> gen_server:reply(From, Msg), dec_pipeline_counter(State); -do_reply(State, _From, StreamTo, ReqId, {ok, _, _, _}) -> +do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) -> State_1 = dec_pipeline_counter(State), + case Body of + [] -> + ok; + _ -> + Body_1 = format_response_data(Resp_format, Body), + catch StreamTo ! {ibrowse_async_response, ReqId, Body_1} + end, catch StreamTo ! {ibrowse_async_response_end, ReqId}, State_1; -do_reply(State, _From, StreamTo, ReqId, Msg) -> +do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) -> State_1 = dec_pipeline_counter(State), - catch StreamTo ! {ibrowse_async_response, ReqId, Msg}, + Msg_1 = format_response_data(Resp_format, Msg), + catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}, State_1. -do_interim_reply(undefined, _ReqId, _Msg) -> +do_interim_reply(undefined, _, _ReqId, _Msg) -> ok; -do_interim_reply(StreamTo, ReqId, Msg) -> - catch StreamTo ! {ibrowse_async_response, ReqId, Msg}. +do_interim_reply(StreamTo, Response_format, ReqId, Msg) -> + Msg_1 = format_response_data(Response_format, Msg), + catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}. do_error_reply(#state{reqs = Reqs} = State, Err) -> ReqList = queue:to_list(Reqs), - lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId}) -> - do_reply(State, From, StreamTo, ReqId, {error, Err}) + lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId, + response_format = Resp_format}) -> + do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err}) end, ReqList). fail_pipelined_requests(#state{reqs = Reqs, cur_req = CurReq} = State, Reply) -> {_, Reqs_1} = queue:out(Reqs), - #request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq, - do_reply(State, From, StreamTo, ReqId, Reply), + #request{from=From, stream_to=StreamTo, req_id=ReqId, + response_format = Resp_format} = CurReq, + do_reply(State, From, StreamTo, ReqId, Resp_format, Reply), do_error_reply(State#state{reqs = Reqs_1}, previous_request_failed). - split_list_at(List, N) -> split_list_at(List, N, []). split_list_at([], _, Acc) -> @@ -1271,7 +1387,7 @@ cancel_timer(Ref) -> erlang:cancel_timer(Ref). cancel_timer(Ref, {eat_message, Msg}) -> cancel_timer(Ref), - receive + receive Msg -> ok after 0 -> @@ -1310,3 +1426,23 @@ dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, ets:delete(Tid, {Pipe_sz, self()}), ets:insert(Tid, {{Pipe_sz - 1, self()}, []}), State#state{cur_pipeline_size = Pipe_sz - 1}. + +flatten([H | _] = L) when is_integer(H) -> + L; +flatten([H | _] = L) when is_list(H) -> + lists:flatten(L); +flatten([]) -> + []. + +get_stream_chunk_size(Options) -> + case lists:keysearch(stream_chunk_size, 1, Options) of + {value, {_, V}} when V > 0 -> + V; + _ -> + ?DEFAULT_STREAM_CHUNK_SIZE + end. + +get_inac_timeout(#state{cur_req = #request{options = Opts}}) -> + get_value(inactivity_timeout, Opts, infinity); +get_inac_timeout(#state{cur_req = undefined}) -> + infinity. diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index 03dc4e0..9212ccd 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -7,7 +7,7 @@ %%%------------------------------------------------------------------- -module(ibrowse_lb). --vsn('$Id: ibrowse_lb.erl,v 1.1 2008/03/27 01:36:21 chandrusf Exp $ '). +-vsn('$Id: ibrowse_lb.erl,v 1.2 2009/07/01 22:43:19 chandrusf Exp $ '). -author(chandru). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -39,13 +39,6 @@ max_pipeline_size, num_cur_sessions = 0}). --import(ibrowse_lib, [ - parse_url/1, - printable_date/0, - get_value/3 - ]). - - -include("ibrowse.hrl"). %%==================================================================== diff --git a/src/ibrowse_test.erl b/src/ibrowse_test.erl index b4429c9..f3559b5 100644 --- a/src/ibrowse_test.erl +++ b/src/ibrowse_test.erl @@ -4,21 +4,23 @@ %%% Created : 14 Oct 2003 by Chandrashekhar Mullaparthi -module(ibrowse_test). --vsn('$Id: ibrowse_test.erl,v 1.3 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse_test.erl,v 1.4 2009/07/01 22:43:19 chandrusf Exp $ '). -export([ load_test/3, send_reqs_1/3, do_send_req/2, unit_tests/0, unit_tests/1, + unit_tests_1/2, drv_ue_test/0, drv_ue_test/1, ue_test/0, - ue_test/1 + ue_test/1, + verify_chunked_streaming/0, + verify_chunked_streaming/1, + i_do_async_req_list/4 ]). --import(ibrowse_lib, [printable_date/0]). - %% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to %% tweak settings before running the load test. The defaults are 10 and 10. load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url), @@ -46,7 +48,7 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) -> log_msg("End time : ~1000.p~n", [calendar:now_to_local_time(End_time)]), Elapsed_time_secs = trunc(timer:now_diff(End_time, Start_time) / 1000000), log_msg("Elapsed : ~p~n", [Elapsed_time_secs]), - log_msg("Reqs/sec : ~p~n", [(NumWorkers*NumReqsPerWorker) / Elapsed_time_secs]), + log_msg("Reqs/sec : ~p~n", [round(trunc((NumWorkers*NumReqsPerWorker) / Elapsed_time_secs))]), dump_errors(). init_results() -> @@ -88,7 +90,7 @@ do_wait() -> do_wait() end end. - + do_send_req(Url, NumReqs) -> do_send_req_1(Url, NumReqs). @@ -149,7 +151,7 @@ dump_errors(Key, Iod) -> -define(TEST_LIST, [{"http://intranet/messenger", get}, {"http://www.google.co.uk", get}, {"http://www.google.com", get}, - {"http://www.google.com", options}, + {"http://www.google.com", options}, {"http://www.sun.com", get}, {"http://www.oracle.com", get}, {"http://www.bbc.co.uk", get}, @@ -172,26 +174,129 @@ dump_errors(Key, Iod) -> {"http://jigsaw.w3.org/HTTP/400/toolong/", get}, {"http://jigsaw.w3.org/HTTP/300/", get}, {"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]}, - {"http://jigsaw.w3.org/HTTP/CL/", get} + {"http://jigsaw.w3.org/HTTP/CL/", get}, + {"http://www.httpwatch.com/httpgallery/chunked/", get} ]). unit_tests() -> unit_tests([]). unit_tests(Options) -> + {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options]), + receive + {done, Pid} -> + ok; + {'DOWN', Ref, _, _, Info} -> + io:format("Test process crashed: ~p~n", [Info]) + after 60000 -> + io:format("Timed out waiting for tests to complete~n", []) + end. + +unit_tests_1(Parent, Options) -> lists:foreach(fun({Url, Method}) -> execute_req(Url, Method, Options); ({Url, Method, X_Opts}) -> execute_req(Url, Method, X_Opts ++ Options) - end, ?TEST_LIST). + end, ?TEST_LIST), + Parent ! {done, self()}. + +verify_chunked_streaming() -> + verify_chunked_streaming([]). + +verify_chunked_streaming(Options) -> + Url = "http://www.httpwatch.com/httpgallery/chunked/", + io:format("URL: ~s~n", [Url]), + io:format("Fetching data without streaming...~n", []), + Result_without_streaming = ibrowse:send_req( + Url, [], get, [], + [{response_format, binary} | Options]), + io:format("Fetching data with streaming as list...~n", []), + Async_response_list = do_async_req_list( + Url, get, [{response_format, list} | Options]), + io:format("Fetching data with streaming as binary...~n", []), + Async_response_bin = do_async_req_list( + Url, get, [{response_format, binary} | Options]), + compare_responses(Result_without_streaming, Async_response_list, Async_response_bin). + +compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) -> + success; +compare_responses({ok, St_code, _, Body_1}, {ok, St_code, _, Body_2}, {ok, St_code, _, Body_3}) -> + case Body_1 of + Body_2 -> + io:format("Body_1 and Body_2 match~n", []); + Body_3 -> + io:format("Body_1 and Body_3 match~n", []); + _ when Body_2 == Body_3 -> + io:format("Body_2 and Body_3 match~n", []); + _ -> + io:format("All three bodies are different!~n", []) + end, + io:format("Body_1 -> ~p~n", [Body_1]), + io:format("Body_2 -> ~p~n", [Body_2]), + io:format("Body_3 -> ~p~n", [Body_3]), + fail_bodies_mismatch; +compare_responses(R1, R2, R3) -> + io:format("R1 -> ~p~n", [R1]), + io:format("R2 -> ~p~n", [R2]), + io:format("R3 -> ~p~n", [R3]), + fail. + +%% do_async_req_list(Url) -> +%% do_async_req_list(Url, get). + +%% do_async_req_list(Url, Method) -> +%% do_async_req_list(Url, Method, [{stream_to, self()}, +%% {stream_chunk_size, 1000}]). -execute_req(Url, Method) -> - execute_req(Url, Method, []). +do_async_req_list(Url, Method, Options) -> + {Pid,_} = erlang:spawn_monitor(?MODULE, i_do_async_req_list, + [self(), Url, Method, + Options ++ [{stream_chunk_size, 1000}]]), + io:format("Spawned process ~p~n", [Pid]), + wait_for_resp(Pid). + +wait_for_resp(Pid) -> + receive + {async_result, Pid, Res} -> + Res; + {'DOWN', _, _, Pid, Reason} -> + {'EXIT', Reason}; + {'DOWN', _, _, _, _} -> + wait_for_resp(Pid); + Msg -> + io:format("Recvd unknown message: ~p~n", [Msg]), + wait_for_resp(Pid) + after 10000 -> + {error, timeout} + end. + +i_do_async_req_list(Parent, Url, Method, Options) -> + Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]), + case Res of + {ibrowse_req_id, Req_id} -> + Result = wait_for_async_resp(Req_id, undefined, undefined, []), + Parent ! {async_result, self(), Result}; + Err -> + Parent ! {async_result, self(), Err} + end. + +wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) -> + receive + {ibrowse_async_headers, Req_id, StatCode, Headers} -> + wait_for_async_resp(Req_id, StatCode, Headers, Body); + {ibrowse_async_response_end, Req_id} -> + Body_1 = list_to_binary(lists:reverse(Body)), + {ok, Acc_Stat_code, Acc_Headers, Body_1}; + {ibrowse_async_response, Req_id, Data} -> + wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]); + Err -> + {ok, Acc_Stat_code, Acc_Headers, Err} + end. execute_req(Url, Method, Options) -> - io:format("~s, ~p: ", [Url, Method]), + io:format("~7.7w, ~50.50s: ", [Method, Url]), Result = (catch ibrowse:send_req(Url, [], Method, [], Options)), - case Result of + case Result of {ok, SCode, _H, _B} -> io:format("Status code: ~p~n", [SCode]); Err -> diff --git a/vsn.mk b/vsn.mk index ce4cd17..f99f439 100644 --- a/vsn.mk +++ b/vsn.mk @@ -1,2 +1,2 @@ -IBROWSE_VSN = 1.4.1 +IBROWSE_VSN = 1.5.0