From 5b13e5b245e14e554beb4d408a524d4bbd7b841c Mon Sep 17 00:00:00 2001 From: Chandrashekhar Mullaparthi Date: Mon, 20 Dec 2010 12:01:26 +0000 Subject: [PATCH] Pipelining wasn't working when used with the {stream_to, {Pid, once}} option --- README | 7 ++- doc/ibrowse.html | 2 +- ebin/ibrowse.app | 2 +- src/ibrowse.erl | 2 +- src/ibrowse_http_client.erl | 35 +++++++++++--- src/ibrowse_test.erl | 93 ++++++++++++++++++++++++++++++++++-- test/ibrowse_test_server.erl | 83 +++++++++++++++++++++++++++++++- vsn.mk | 2 +- 8 files changed, 210 insertions(+), 16 deletions(-) diff --git a/README b/README index e6f8cd5..d47f28d 100644 --- a/README +++ b/README @@ -18,7 +18,7 @@ ibrowse is available under two different licenses. LGPL or the BSD license. Comments to : Chandrashekhar.Mullaparthi@gmail.com -Version : 2.1.1 +Version : 2.1.2 Latest version : git://github.com/cmullaparthi/ibrowse.git @@ -60,6 +60,11 @@ tholschuh (https://github.com/tholschuh/) CONTRIBUTIONS & CHANGE HISTORY ============================== +20-12-2010 - v2.1.2 + * Pipelining wasn't working when used in conjunction with the + {stream_to, {self(), once}} option. Bug report by + Filipe David Manana. + 10-12-2010 - v2.1.1 * Fix for https://github.com/cmullaparthi/ibrowse/issues/issue/20 by Filipe David Manana diff --git a/doc/ibrowse.html b/doc/ibrowse.html index 3aec4a8..1594d74 100644 --- a/doc/ibrowse.html +++ b/doc/ibrowse.html @@ -12,7 +12,7 @@ The ibrowse application implements an HTTP 1.1 client in erlang.

Copyright © 2005-2010 Chandrashekhar Mullaparthi

-

Version: 2.1.0

+

Version: 2.1.2

Behaviours: gen_server.

Authors: Chandrashekhar Mullaparthi (chandrashekhar dot mullaparthi at gmail dot com).

diff --git a/ebin/ibrowse.app b/ebin/ibrowse.app index aee0f20..c8e4227 100644 --- a/ebin/ibrowse.app +++ b/ebin/ibrowse.app @@ -1,6 +1,6 @@ {application, ibrowse, [{description, "HTTP client application"}, - {vsn, "2.1.1"}, + {vsn, "2.1.2"}, {modules, [ ibrowse, ibrowse_http_client, ibrowse_app, diff --git a/src/ibrowse.erl b/src/ibrowse.erl index 6e20cfb..e105150 100644 --- a/src/ibrowse.erl +++ b/src/ibrowse.erl @@ -7,7 +7,7 @@ %%%------------------------------------------------------------------- %% @author Chandrashekhar Mullaparthi %% @copyright 2005-2010 Chandrashekhar Mullaparthi -%% @version 2.1.1 +%% @version 2.1.2 %% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This %% module implements the API of the HTTP client. There is one named %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 0135a49..ea75948 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -191,6 +191,14 @@ handle_info({stream_next, Req_id}, #state{socket = Socket, {noreply, State}; handle_info({stream_next, _Req_id}, State) -> + _Cur_req_id = case State#state.cur_req of + #request{req_id = Cur} -> + Cur; + _ -> + undefined + end, +%% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n", +%% [_Req_id, _Cur_req_id]), {noreply, State}; handle_info({stream_close, _Req_id}, State) -> @@ -625,7 +633,7 @@ send_req_1(From, Path = [Server_host, $:, integer_to_list(Server_port)], {Req, Body_1} = make_request(connect, Pxy_auth_headers, Path, Path, - [], Options, State_1), + [], Options, State_1, undefined), TE = is_chunked_encoding_specified(Options), trace_request(Req), case do_send(Req, State) of @@ -711,7 +719,8 @@ send_req_1(From, Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), {Req, Body_1} = make_request(Method, Headers_1, - AbsPath, RelPath, Body, Options, State_1), + AbsPath, RelPath, Body, Options, State_1, + ReqId), trace_request(Req), do_setopts(Socket, Caller_socket_options, State_1), TE = is_chunked_encoding_specified(Options), @@ -811,7 +820,7 @@ http_auth_digest(Username, Password) -> ibrowse_lib:encode_base64(Username ++ [$: | Password]). make_request(Method, Headers, AbsPath, RelPath, Body, Options, - #state{use_proxy = UseProxy, is_ssl = Is_ssl}) -> + #state{use_proxy = UseProxy, is_ssl = Is_ssl}, ReqId) -> HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})), Fun1 = fun({X, Y}) when is_atom(X) -> {to_lower(atom_to_list(X)), X, Y}; @@ -847,7 +856,13 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, [{"Transfer-Encoding", "chunked"}], chunk_request_body(Body, Chunk_size_1)} end, - Headers_3 = cons_headers(Headers_2), + Headers_3 = case lists:member({include_ibrowse_req_id, true}, Options) of + true -> + [{"x-ibrowse-request-id", io_lib:format("~1000.p",[ReqId])} | Headers_2]; + false -> + Headers_2 + end, + Headers_4 = cons_headers(Headers_3), Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of true -> case Is_ssl of @@ -859,7 +874,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, false -> RelPath end, - {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}. + {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_4, crnl()], Body_1}. is_chunked_encoding_specified(Options) -> case get_value(transfer_encoding, Options, false) of @@ -1303,11 +1318,17 @@ reset_state(State) -> transfer_encoding = undefined }. -set_cur_request(#state{reqs = Reqs} = State) -> +set_cur_request(#state{reqs = Reqs, socket = Socket} = State) -> case queue:to_list(Reqs) of [] -> State#state{cur_req = undefined}; - [NextReq | _] -> + [#request{caller_controls_socket = Ccs} = NextReq | _] -> + case Ccs of + true -> + do_setopts(Socket, [{active, once}], State); + _ -> + ok + end, State#state{cur_req = NextReq} end. diff --git a/src/ibrowse_test.erl b/src/ibrowse_test.erl index 3ad7660..b8e0a4a 100644 --- a/src/ibrowse_test.erl +++ b/src/ibrowse_test.erl @@ -20,7 +20,8 @@ test_chunked_streaming_once/0, i_do_async_req_list/4, test_stream_once/3, - test_stream_once/4 + test_stream_once/4, + test_20122010/0 ]). test_stream_once(Url, Method, Options) -> @@ -218,7 +219,8 @@ dump_errors(Key, Iod) -> {"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]}, {"http://jigsaw.w3.org/HTTP/CL/", get}, {"http://www.httpwatch.com/httpgallery/chunked/", get}, - {"https://github.com", get, [{ssl_options, [{depth, 2}]}]} + {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}, + {local_test_fun, test_20122010, []} ]). unit_tests() -> @@ -228,6 +230,7 @@ unit_tests(Options) -> application:start(crypto), application:start(public_key), application:start(ssl), + (catch ibrowse_test_server:start_server(8181, tcp)), ibrowse:start(), Options_1 = Options ++ [{connect_timeout, 5000}], {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]), @@ -242,7 +245,9 @@ unit_tests(Options) -> end. unit_tests_1(Parent, Options) -> - lists:foreach(fun({Url, Method}) -> + lists:foreach(fun({local_test_fun, Fun_name, Args}) -> + execute_req(local_test_fun, Fun_name, Args); + ({Url, Method}) -> execute_req(Url, Method, Options); ({Url, Method, X_Opts}) -> execute_req(Url, Method, X_Opts ++ Options) @@ -394,6 +399,10 @@ maybe_stream_next(Req_id, Options) -> ok end. +execute_req(local_test_fun, Method, Args) -> + io:format(" ~-54.54w: ", [Method]), + Result = (catch apply(?MODULE, Method, Args)), + io:format("~p~n", [Result]); execute_req(Url, Method, Options) -> io:format("~7.7w, ~50.50s: ", [Method, Url]), Result = (catch ibrowse:send_req(Url, [], Method, [], Options)), @@ -430,3 +439,81 @@ ue_test(Data) -> log_msg(Fmt, Args) -> io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]). + +%%------------------------------------------------------------------------------ +%% +%%------------------------------------------------------------------------------ + +test_20122010() -> + {ok, Pid} = ibrowse:spawn_worker_process("http://localhost:8181"), + Expected_resp = <<"1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27-28-29-30-31-32-33-34-35-36-37-38-39-40-41-42-43-44-45-46-47-48-49-50-51-52-53-54-55-56-57-58-59-60-61-62-63-64-65-66-67-68-69-70-71-72-73-74-75-76-77-78-79-80-81-82-83-84-85-86-87-88-89-90-91-92-93-94-95-96-97-98-99-100">>, + Test_parent = self(), + Fun = fun() -> + do_test_20122010(Pid, Expected_resp, Test_parent) + end, + Pids = [erlang:spawn_monitor(Fun) || _ <- lists:seq(1,10)], + wait_for_workers(Pids). + +wait_for_workers([{Pid, _Ref} | Pids]) -> + receive + {Pid, success} -> + wait_for_workers(Pids) + after 5000 -> + test_failed + end; +wait_for_workers([]) -> + success. + +do_test_20122010(Pid, Expected_resp, Test_parent) -> + {ibrowse_req_id, Req_id} = ibrowse:send_req_direct( + Pid, + "http://localhost:8181/ibrowse_stream_once_chunk_pipeline_test", + [], get, [], + [{stream_to, {self(), once}}, + {include_ibrowse_req_id, true}]), + do_trace("~p -- sent request ~1000.p~n", [self(), Req_id]), + Req_id_str = lists:flatten(io_lib:format("~1000.p",[Req_id])), + receive + {ibrowse_async_headers, Req_id, "200", Headers} -> + case lists:keysearch("x-ibrowse-request-id", 1, Headers) of + {value, {_, Req_id_str}} -> + ok; + {value, {_, Req_id_1}} -> + do_trace("~p -- Sent req-id: ~1000.p. Recvd: ~1000.p~n", + [self(), Req_id, Req_id_1]), + exit(req_id_mismatch) + end + after 5000 -> + do_trace("~p -- response headers not received~n", [self()]), + exit({timeout, test_failed}) + end, + do_trace("~p -- response headers received~n", [self()]), + ok = ibrowse:stream_next(Req_id), + case do_test_20122010_1(Expected_resp, Req_id, []) of + true -> + Test_parent ! {self(), success}; + false -> + Test_parent ! {self(), failed} + end. + +do_test_20122010_1(Expected_resp, Req_id, Acc) -> + receive + {ibrowse_async_response, Req_id, Body_part} -> + ok = ibrowse:stream_next(Req_id), + do_test_20122010_1(Expected_resp, Req_id, [Body_part | Acc]); + {ibrowse_async_response_end, Req_id} -> + Acc_1 = list_to_binary(lists:reverse(Acc)), + Result = Acc_1 == Expected_resp, + do_trace("~p -- End of response. Result: ~p~n", [self(), Result]), + Result + after 1000 -> + exit({timeout, test_failed}) + end. + +do_trace(Fmt, Args) -> + do_trace(get(my_trace_flag), Fmt, Args). + +do_trace(true, Fmt, Args) -> + io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]); +do_trace(_, _, _) -> + ok. diff --git a/test/ibrowse_test_server.erl b/test/ibrowse_test_server.erl index 37d760f..d3b66bc 100644 --- a/test/ibrowse_test_server.erl +++ b/test/ibrowse_test_server.erl @@ -11,10 +11,13 @@ -record(request, {method, uri, version, headers = [], body = []}). +-define(dec2hex(X), erlang:integer_to_list(X, 16)). + start_server(Port, Sock_type) -> Fun = fun() -> register(server_proc_name(Port), self()), case do_listen(Sock_type, Port, [{active, false}, + {nodelay, true}, {packet, http}]) of {ok, Sock} -> do_trace("Server listening on port: ~p~n", [Port]), @@ -88,6 +91,9 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> {setopts, Opts} -> setopts(Sock, Sock_type, Opts), server_loop(Sock, Sock_type, Req); + {tcp_closed, Sock} -> + do_trace("Client closed connection~n", []), + ok; Other -> do_trace("Recvd unknown msg: ~p~n", [Other]), exit({unknown_msg, Other}) @@ -97,8 +103,34 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) -> end. do_trace(Fmt, Args) -> - io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]). + do_trace(get(my_trace_flag), Fmt, Args). + +do_trace(true, Fmt, Args) -> + io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]); +do_trace(_, _, _) -> + ok. +process_request(Sock, Sock_type, + #request{method='GET', + headers = Headers, + uri = {abs_path, "/ibrowse_stream_once_chunk_pipeline_test"}} = Req) -> + Req_id = case lists:keysearch("X-Ibrowse-Request-Id", 3, Headers) of + false -> + ""; + {value, {http_header, _, _, _, Req_id_1}} -> + Req_id_1 + end, + Req_id_header = ["x-ibrowse-request-id: ", Req_id, "\r\n"], + do_trace("Recvd req: ~p~n", [Req]), + Body = string:join([integer_to_list(X) || X <- lists:seq(1,100)], "-"), + Chunked_body = chunk_request_body(Body, 50), + Resp_1 = [<<"HTTP/1.1 200 OK\r\n">>, + Req_id_header, + <<"Transfer-Encoding: chunked\r\n\r\n">>], + Resp_2 = Chunked_body, + do_send(Sock, Sock_type, Resp_1), + timer:sleep(100), + do_send(Sock, Sock_type, Resp_2); process_request(Sock, Sock_type, Req) -> do_trace("Recvd req: ~p~n", [Req]), Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>, @@ -108,3 +140,52 @@ do_send(Sock, tcp, Resp) -> ok = gen_tcp:send(Sock, Resp); do_send(Sock, ssl, Resp) -> ok = ssl:send(Sock, Resp). + + +%%------------------------------------------------------------------------------ +%% Utility functions +%%------------------------------------------------------------------------------ + +chunk_request_body(Body, _ChunkSize) when is_tuple(Body) orelse + is_function(Body) -> + Body; +chunk_request_body(Body, ChunkSize) -> + chunk_request_body(Body, ChunkSize, []). + +chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] -> + LastChunk = "0\r\n", + lists:reverse(["\r\n", LastChunk | Acc]); +chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body), + size(Body) >= ChunkSize -> + <> = Body, + Chunk = [?dec2hex(ChunkSize),"\r\n", + ChunkBody, "\r\n"], + chunk_request_body(Rest, ChunkSize, [Chunk | Acc]); +chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) -> + BodySize = size(Body), + Chunk = [?dec2hex(BodySize),"\r\n", + Body, "\r\n"], + LastChunk = "0\r\n", + lists:reverse(["\r\n", LastChunk, Chunk | Acc]); +chunk_request_body(Body, ChunkSize, Acc) when length(Body) >= ChunkSize -> + {ChunkBody, Rest} = split_list_at(Body, ChunkSize), + Chunk = [?dec2hex(ChunkSize),"\r\n", + ChunkBody, "\r\n"], + chunk_request_body(Rest, ChunkSize, [Chunk | Acc]); +chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) -> + BodySize = length(Body), + Chunk = [?dec2hex(BodySize),"\r\n", + Body, "\r\n"], + LastChunk = "0\r\n", + lists:reverse(["\r\n", LastChunk, Chunk | Acc]). + +split_list_at(List, N) -> + split_list_at(List, N, []). + +split_list_at([], _, Acc) -> + {lists:reverse(Acc), []}; +split_list_at(List2, 0, List1) -> + {lists:reverse(List1), List2}; +split_list_at([H | List2], N, List1) -> + split_list_at(List2, N-1, [H | List1]). + diff --git a/vsn.mk b/vsn.mk index 264a1a1..ac4bf7a 100644 --- a/vsn.mk +++ b/vsn.mk @@ -1,2 +1,2 @@ -IBROWSE_VSN = 2.1.1 +IBROWSE_VSN = 2.1.2