Просмотр исходного кода

Pipelining wasn't working when used with the {stream_to, {Pid, once}} option

pull/24/head v2.1.2
Chandrashekhar Mullaparthi 14 лет назад
Родитель
Сommit
5b13e5b245
8 измененных файлов: 210 добавлений и 16 удалений
  1. +6
    -1
      README
  2. +1
    -1
      doc/ibrowse.html
  3. +1
    -1
      ebin/ibrowse.app
  4. +1
    -1
      src/ibrowse.erl
  5. +28
    -7
      src/ibrowse_http_client.erl
  6. +90
    -3
      src/ibrowse_test.erl
  7. +82
    -1
      test/ibrowse_test_server.erl
  8. +1
    -1
      vsn.mk

+ 6
- 1
README Просмотреть файл

@ -18,7 +18,7 @@ ibrowse is available under two different licenses. LGPL or the BSD license.
Comments to : Chandrashekhar.Mullaparthi@gmail.com
Version : 2.1.1
Version : 2.1.2
Latest version : git://github.com/cmullaparthi/ibrowse.git
@ -60,6 +60,11 @@ tholschuh (https://github.com/tholschuh/)
CONTRIBUTIONS & CHANGE HISTORY
==============================
20-12-2010 - v2.1.2
* Pipelining wasn't working when used in conjunction with the
{stream_to, {self(), once}} option. Bug report by
Filipe David Manana.
10-12-2010 - v2.1.1
* Fix for https://github.com/cmullaparthi/ibrowse/issues/issue/20
by Filipe David Manana

+ 1
- 1
doc/ibrowse.html Просмотреть файл

@ -12,7 +12,7 @@
<ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client in erlang.
<p>Copyright © 2005-2010 Chandrashekhar Mullaparthi</p>
<p><b>Version:</b> 2.1.0</p>
<p><b>Version:</b> 2.1.2</p>
<p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p>
<p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p>

+ 1
- 1
ebin/ibrowse.app Просмотреть файл

@ -1,6 +1,6 @@
{application, ibrowse,
[{description, "HTTP client application"},
{vsn, "2.1.1"},
{vsn, "2.1.2"},
{modules, [ ibrowse,
ibrowse_http_client,
ibrowse_app,

+ 1
- 1
src/ibrowse.erl Просмотреть файл

@ -7,7 +7,7 @@
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2010 Chandrashekhar Mullaparthi
%% @version 2.1.1
%% @version 2.1.2
%% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is

+ 28
- 7
src/ibrowse_http_client.erl Просмотреть файл

@ -191,6 +191,14 @@ handle_info({stream_next, Req_id}, #state{socket = Socket,
{noreply, State};
handle_info({stream_next, _Req_id}, State) ->
_Cur_req_id = case State#state.cur_req of
#request{req_id = Cur} ->
Cur;
_ ->
undefined
end,
%% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n",
%% [_Req_id, _Cur_req_id]),
{noreply, State};
handle_info({stream_close, _Req_id}, State) ->
@ -625,7 +633,7 @@ send_req_1(From,
Path = [Server_host, $:, integer_to_list(Server_port)],
{Req, Body_1} = make_request(connect, Pxy_auth_headers,
Path, Path,
[], Options, State_1),
[], Options, State_1, undefined),
TE = is_chunked_encoding_specified(Options),
trace_request(Req),
case do_send(Req, State) of
@ -711,7 +719,8 @@ send_req_1(From,
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
{Req, Body_1} = make_request(Method,
Headers_1,
AbsPath, RelPath, Body, Options, State_1),
AbsPath, RelPath, Body, Options, State_1,
ReqId),
trace_request(Req),
do_setopts(Socket, Caller_socket_options, State_1),
TE = is_chunked_encoding_specified(Options),
@ -811,7 +820,7 @@ http_auth_digest(Username, Password) ->
ibrowse_lib:encode_base64(Username ++ [$: | Password]).
make_request(Method, Headers, AbsPath, RelPath, Body, Options,
#state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
#state{use_proxy = UseProxy, is_ssl = Is_ssl}, ReqId) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Fun1 = fun({X, Y}) when is_atom(X) ->
{to_lower(atom_to_list(X)), X, Y};
@ -847,7 +856,13 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options,
[{"Transfer-Encoding", "chunked"}],
chunk_request_body(Body, Chunk_size_1)}
end,
Headers_3 = cons_headers(Headers_2),
Headers_3 = case lists:member({include_ibrowse_req_id, true}, Options) of
true ->
[{"x-ibrowse-request-id", io_lib:format("~1000.p",[ReqId])} | Headers_2];
false ->
Headers_2
end,
Headers_4 = cons_headers(Headers_3),
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
case Is_ssl of
@ -859,7 +874,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options,
false ->
RelPath
end,
{[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}.
{[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_4, crnl()], Body_1}.
is_chunked_encoding_specified(Options) ->
case get_value(transfer_encoding, Options, false) of
@ -1303,11 +1318,17 @@ reset_state(State) ->
transfer_encoding = undefined
}.
set_cur_request(#state{reqs = Reqs} = State) ->
set_cur_request(#state{reqs = Reqs, socket = Socket} = State) ->
case queue:to_list(Reqs) of
[] ->
State#state{cur_req = undefined};
[NextReq | _] ->
[#request{caller_controls_socket = Ccs} = NextReq | _] ->
case Ccs of
true ->
do_setopts(Socket, [{active, once}], State);
_ ->
ok
end,
State#state{cur_req = NextReq}
end.

+ 90
- 3
src/ibrowse_test.erl Просмотреть файл

@ -20,7 +20,8 @@
test_chunked_streaming_once/0,
i_do_async_req_list/4,
test_stream_once/3,
test_stream_once/4
test_stream_once/4,
test_20122010/0
]).
test_stream_once(Url, Method, Options) ->
@ -218,7 +219,8 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
{"http://jigsaw.w3.org/HTTP/CL/", get},
{"http://www.httpwatch.com/httpgallery/chunked/", get},
{"https://github.com", get, [{ssl_options, [{depth, 2}]}]}
{"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
{local_test_fun, test_20122010, []}
]).
unit_tests() ->
@ -228,6 +230,7 @@ unit_tests(Options) ->
application:start(crypto),
application:start(public_key),
application:start(ssl),
(catch ibrowse_test_server:start_server(8181, tcp)),
ibrowse:start(),
Options_1 = Options ++ [{connect_timeout, 5000}],
{Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
@ -242,7 +245,9 @@ unit_tests(Options) ->
end.
unit_tests_1(Parent, Options) ->
lists:foreach(fun({Url, Method}) ->
lists:foreach(fun({local_test_fun, Fun_name, Args}) ->
execute_req(local_test_fun, Fun_name, Args);
({Url, Method}) ->
execute_req(Url, Method, Options);
({Url, Method, X_Opts}) ->
execute_req(Url, Method, X_Opts ++ Options)
@ -394,6 +399,10 @@ maybe_stream_next(Req_id, Options) ->
ok
end.
execute_req(local_test_fun, Method, Args) ->
io:format(" ~-54.54w: ", [Method]),
Result = (catch apply(?MODULE, Method, Args)),
io:format("~p~n", [Result]);
execute_req(Url, Method, Options) ->
io:format("~7.7w, ~50.50s: ", [Method, Url]),
Result = (catch ibrowse:send_req(Url, [], Method, [], Options)),
@ -430,3 +439,81 @@ ue_test(Data) ->
log_msg(Fmt, Args) ->
io:format("~s -- " ++ Fmt,
[ibrowse_lib:printable_date() | Args]).
%%------------------------------------------------------------------------------
%%
%%------------------------------------------------------------------------------
test_20122010() ->
{ok, Pid} = ibrowse:spawn_worker_process("http://localhost:8181"),
Expected_resp = <<"1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27-28-29-30-31-32-33-34-35-36-37-38-39-40-41-42-43-44-45-46-47-48-49-50-51-52-53-54-55-56-57-58-59-60-61-62-63-64-65-66-67-68-69-70-71-72-73-74-75-76-77-78-79-80-81-82-83-84-85-86-87-88-89-90-91-92-93-94-95-96-97-98-99-100">>,
Test_parent = self(),
Fun = fun() ->
do_test_20122010(Pid, Expected_resp, Test_parent)
end,
Pids = [erlang:spawn_monitor(Fun) || _ <- lists:seq(1,10)],
wait_for_workers(Pids).
wait_for_workers([{Pid, _Ref} | Pids]) ->
receive
{Pid, success} ->
wait_for_workers(Pids)
after 5000 ->
test_failed
end;
wait_for_workers([]) ->
success.
do_test_20122010(Pid, Expected_resp, Test_parent) ->
{ibrowse_req_id, Req_id} = ibrowse:send_req_direct(
Pid,
"http://localhost:8181/ibrowse_stream_once_chunk_pipeline_test",
[], get, [],
[{stream_to, {self(), once}},
{include_ibrowse_req_id, true}]),
do_trace("~p -- sent request ~1000.p~n", [self(), Req_id]),
Req_id_str = lists:flatten(io_lib:format("~1000.p",[Req_id])),
receive
{ibrowse_async_headers, Req_id, "200", Headers} ->
case lists:keysearch("x-ibrowse-request-id", 1, Headers) of
{value, {_, Req_id_str}} ->
ok;
{value, {_, Req_id_1}} ->
do_trace("~p -- Sent req-id: ~1000.p. Recvd: ~1000.p~n",
[self(), Req_id, Req_id_1]),
exit(req_id_mismatch)
end
after 5000 ->
do_trace("~p -- response headers not received~n", [self()]),
exit({timeout, test_failed})
end,
do_trace("~p -- response headers received~n", [self()]),
ok = ibrowse:stream_next(Req_id),
case do_test_20122010_1(Expected_resp, Req_id, []) of
true ->
Test_parent ! {self(), success};
false ->
Test_parent ! {self(), failed}
end.
do_test_20122010_1(Expected_resp, Req_id, Acc) ->
receive
{ibrowse_async_response, Req_id, Body_part} ->
ok = ibrowse:stream_next(Req_id),
do_test_20122010_1(Expected_resp, Req_id, [Body_part | Acc]);
{ibrowse_async_response_end, Req_id} ->
Acc_1 = list_to_binary(lists:reverse(Acc)),
Result = Acc_1 == Expected_resp,
do_trace("~p -- End of response. Result: ~p~n", [self(), Result]),
Result
after 1000 ->
exit({timeout, test_failed})
end.
do_trace(Fmt, Args) ->
do_trace(get(my_trace_flag), Fmt, Args).
do_trace(true, Fmt, Args) ->
io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]);
do_trace(_, _, _) ->
ok.

+ 82
- 1
test/ibrowse_test_server.erl Просмотреть файл

@ -11,10 +11,13 @@
-record(request, {method, uri, version, headers = [], body = []}).
-define(dec2hex(X), erlang:integer_to_list(X, 16)).
start_server(Port, Sock_type) ->
Fun = fun() ->
register(server_proc_name(Port), self()),
case do_listen(Sock_type, Port, [{active, false},
{nodelay, true},
{packet, http}]) of
{ok, Sock} ->
do_trace("Server listening on port: ~p~n", [Port]),
@ -88,6 +91,9 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
{setopts, Opts} ->
setopts(Sock, Sock_type, Opts),
server_loop(Sock, Sock_type, Req);
{tcp_closed, Sock} ->
do_trace("Client closed connection~n", []),
ok;
Other ->
do_trace("Recvd unknown msg: ~p~n", [Other]),
exit({unknown_msg, Other})
@ -97,8 +103,34 @@ server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
end.
do_trace(Fmt, Args) ->
io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]).
do_trace(get(my_trace_flag), Fmt, Args).
do_trace(true, Fmt, Args) ->
io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]);
do_trace(_, _, _) ->
ok.
process_request(Sock, Sock_type,
#request{method='GET',
headers = Headers,
uri = {abs_path, "/ibrowse_stream_once_chunk_pipeline_test"}} = Req) ->
Req_id = case lists:keysearch("X-Ibrowse-Request-Id", 3, Headers) of
false ->
"";
{value, {http_header, _, _, _, Req_id_1}} ->
Req_id_1
end,
Req_id_header = ["x-ibrowse-request-id: ", Req_id, "\r\n"],
do_trace("Recvd req: ~p~n", [Req]),
Body = string:join([integer_to_list(X) || X <- lists:seq(1,100)], "-"),
Chunked_body = chunk_request_body(Body, 50),
Resp_1 = [<<"HTTP/1.1 200 OK\r\n">>,
Req_id_header,
<<"Transfer-Encoding: chunked\r\n\r\n">>],
Resp_2 = Chunked_body,
do_send(Sock, Sock_type, Resp_1),
timer:sleep(100),
do_send(Sock, Sock_type, Resp_2);
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,
@ -108,3 +140,52 @@ do_send(Sock, tcp, Resp) ->
ok = gen_tcp:send(Sock, Resp);
do_send(Sock, ssl, Resp) ->
ok = ssl:send(Sock, Resp).
%%------------------------------------------------------------------------------
%% Utility functions
%%------------------------------------------------------------------------------
chunk_request_body(Body, _ChunkSize) when is_tuple(Body) orelse
is_function(Body) ->
Body;
chunk_request_body(Body, ChunkSize) ->
chunk_request_body(Body, ChunkSize, []).
chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk | Acc]);
chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
size(Body) >= ChunkSize ->
<<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
Chunk = [?dec2hex(ChunkSize),"\r\n",
ChunkBody, "\r\n"],
chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
BodySize = size(Body),
Chunk = [?dec2hex(BodySize),"\r\n",
Body, "\r\n"],
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
chunk_request_body(Body, ChunkSize, Acc) when length(Body) >= ChunkSize ->
{ChunkBody, Rest} = split_list_at(Body, ChunkSize),
Chunk = [?dec2hex(ChunkSize),"\r\n",
ChunkBody, "\r\n"],
chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
BodySize = length(Body),
Chunk = [?dec2hex(BodySize),"\r\n",
Body, "\r\n"],
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk, Chunk | Acc]).
split_list_at(List, N) ->
split_list_at(List, N, []).
split_list_at([], _, Acc) ->
{lists:reverse(Acc), []};
split_list_at(List2, 0, List1) ->
{lists:reverse(List1), List2};
split_list_at([H | List2], N, List1) ->
split_list_at(List2, N-1, [H | List1]).

+ 1
- 1
vsn.mk Просмотреть файл

@ -1,2 +1,2 @@
IBROWSE_VSN = 2.1.1
IBROWSE_VSN = 2.1.2

Загрузка…
Отмена
Сохранить