Browse Source

Merge 330611f372 into af5eebd9f5

pull/92/merge
Eugene Girshov 12 years ago
parent
commit
066bd7e150
4 changed files with 141 additions and 12 deletions
  1. +20
    -0
      src/ibrowse.erl
  2. +53
    -4
      src/ibrowse_http_client.erl
  3. +62
    -8
      test/ibrowse_test.erl
  4. +6
    -0
      test/ibrowse_test_server.erl

+ 20
- 0
src/ibrowse.erl View File

@ -87,6 +87,8 @@
send_req_direct/6,
send_req_direct/7,
stream_next/1,
send_chunk/2,
send_done/1,
stream_close/1,
set_max_sessions/3,
set_max_pipeline_size/3,
@ -554,6 +556,24 @@ stream_next(Req_id) ->
ok
end.
send_chunk(Req_id, Data) ->
case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
[] ->
{error, unknown_req_id};
[{_, Pid}] ->
catch Pid ! {send_chunk, Req_id, Data},
ok
end.
send_done(Req_id) ->
case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
[] ->
{error, unknown_req_id};
[{_, Pid}] ->
catch Pid ! {send_done, Req_id},
ok
end.
%% @doc Tell ibrowse to close the connection associated with the
%% specified stream. Should be used in conjunction with the
%% <code>stream_to</code> option. Note that all requests in progress on

+ 53
- 4
src/ibrowse_http_client.erl View File

@ -252,6 +252,42 @@ handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
handle_info({send_chunk, Req_id, Data},
#state{cur_req = #request{req_id = Req_id}} = State) ->
Current_request = State#state.cur_req,
Options = Current_request#request.options,
TE = is_chunked_encoding_specified(Options),
case do_send(maybe_chunked_encode(Data, TE), State) of
ok ->
{noreply, State};
_ ->
ets:delete(ibrowse_stream, {req_id_pid, Req_id}),
shutting_down(State),
{stop, normal, State}
end;
handle_info({send_done, Req_id},
#state{cur_req = #request{req_id = Req_id}} = State) ->
Current_request = State#state.cur_req,
Options = Current_request#request.options,
TE = is_chunked_encoding_specified(Options),
case TE of
true -> do_send(<<"0\r\n\r\n">>, State);
_ -> ok
end,
State_2 = inc_pipeline_counter(State),
active_once(State_2),
State_3 = case State#state.status of
idle ->
do_trace("Request send completely, switch to get_header state~n", []),
State_2#state{status = get_header};
_ ->
State_2
end,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
handle_info(Info, State) ->
io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
[State#state.host, State#state.port, Info]),
@ -737,17 +773,21 @@ send_req_1(From,
ReqId = make_req_id(),
Resp_format = get_value(response_format, Options, list),
Caller_socket_options = get_value(socket_options, Options, []),
Streaming_request = is_streaming_request(Options),
Async_pid_rec = {{req_id_pid, ReqId}, self()},
{StreamTo, Caller_controls_socket} =
case get_value(stream_to, Options, undefined) of
{Caller, once} when is_pid(Caller) or
is_atom(Caller) ->
Async_pid_rec = {{req_id_pid, ReqId}, self()},
true = ets:insert(ibrowse_stream, Async_pid_rec),
{Caller, true};
undefined ->
{undefined, false};
Caller when is_pid(Caller) or
is_atom(Caller) ->
case Streaming_request of true ->
true = ets:insert(ibrowse_stream, Async_pid_rec)
end,
{Caller, false};
Stream_to_inv ->
exit({invalid_option, {stream_to, Stream_to_inv}})
@ -782,8 +822,12 @@ send_req_1(From,
trace_request(Req),
do_setopts(Socket, Caller_socket_options, State_1),
TE = is_chunked_encoding_specified(Options),
case do_send(Req, State_1) of
ok ->
case {do_send(Req, State_1), Streaming_request} of
{ok, true} ->
State_2 = State_1#state{cur_req = NewReq},
do_trace("dont send request body immediately, return reqid~n", []),
{reply, {ibrowse_req_id, ReqId}, State_2};
{ok, false} ->
case do_send_body(Body_1, State_1, TE) of
ok ->
trace_request_body(Body_1),
@ -804,7 +848,7 @@ send_req_1(From,
end,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
Err ->
{Err, _} ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
@ -937,6 +981,9 @@ is_chunked_encoding_specified(Options) ->
true
end.
is_streaming_request(Options) ->
get_value(stream_request, Options, false).
http_vsn_string({0,9}) -> "HTTP/0.9";
http_vsn_string({1,0}) -> "HTTP/1.0";
http_vsn_string({1,1}) -> "HTTP/1.1".
@ -1926,5 +1973,7 @@ trace_request_body(Body) ->
ok
end.
to_binary({X, _}) when is_function(X) -> to_binary(X);
to_binary(X) when is_function(X) -> <<"body generated by function">>;
to_binary(X) when is_list(X) -> list_to_binary(X);
to_binary(X) when is_binary(X) -> X.

+ 62
- 8
test/ibrowse_test.erl View File

@ -27,7 +27,13 @@
test_head_transfer_encoding/0,
test_head_transfer_encoding/1,
test_head_response_with_body/0,
test_head_response_with_body/1
test_head_response_with_body/1,
i_do_streaming_request/4,
i_do_streaming_request2/2,
test_put_request/0,
test_put_request/1,
test_put_request_chunked/0,
test_put_request_chunked/1
]).
test_stream_once(Url, Method, Options) ->
@ -233,7 +239,9 @@ dump_errors(Key, Iod) ->
{local_test_fun, test_20122010, []},
{local_test_fun, test_pipeline_head_timeout, []},
{local_test_fun, test_head_transfer_encoding, []},
{local_test_fun, test_head_response_with_body, []}
{local_test_fun, test_head_response_with_body, []},
{local_test_fun, test_put_request, []},
{local_test_fun, test_put_request_chunked, []}
]).
unit_tests() ->
@ -283,13 +291,13 @@ verify_chunked_streaming(Options) ->
[{response_format, binary} | Options]),
io:format(" Fetching data with streaming as list...~n", []),
Async_response_list = do_async_req_list(
Url, get, [{response_format, list} | Options]),
Url, get, i_do_async_req_list, [{response_format, list} | Options]),
io:format(" Fetching data with streaming as binary...~n", []),
Async_response_bin = do_async_req_list(
Url, get, [{response_format, binary} | Options]),
Url, get, i_do_async_req_list, [{response_format, binary} | Options]),
io:format(" Fetching data with streaming as binary, {active, once}...~n", []),
Async_response_bin_once = do_async_req_list(
Url, get, [once, {response_format, binary} | Options]),
Url, get, i_do_async_req_list, [once, {response_format, binary} | Options]),
Res1 = compare_responses(Result_without_streaming, Async_response_list, Async_response_bin),
Res2 = compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once),
case {Res1, Res2} of
@ -307,7 +315,7 @@ test_chunked_streaming_once(Options) ->
Url = "http://www.httpwatch.com/httpgallery/chunked/",
io:format(" URL: ~s~n", [Url]),
io:format(" Fetching data with streaming as binary, {active, once}...~n", []),
case do_async_req_list(Url, get, [once, {response_format, binary} | Options]) of
case do_async_req_list(Url, get, i_do_async_req_list, [once, {response_format, binary} | Options]) of
{ok, _, _, _} ->
io:format(" Success!~n", []);
Err ->
@ -344,8 +352,8 @@ compare_responses(R1, R2, R3) ->
%% do_async_req_list(Url, Method, [{stream_to, self()},
%% {stream_chunk_size, 1000}]).
do_async_req_list(Url, Method, Options) ->
{Pid,_} = erlang:spawn_monitor(?MODULE, i_do_async_req_list,
do_async_req_list(Url, Method, Fun, Options) ->
{Pid,_} = erlang:spawn_monitor(?MODULE, Fun,
[self(), Url, Method,
Options ++ [{stream_chunk_size, 1000}]]),
%% io:format("Spawned process ~p~n", [Pid]),
@ -415,6 +423,26 @@ maybe_stream_next(Req_id, Options) ->
ok
end.
i_do_streaming_request(Parent, Url, Method, Options) ->
{Headers, Options_1} = case lists:member(chunked, Options) of
true -> {[], [{transfer_encoding, chunked} | (Options -- [chunked])]};
false -> {[{"Content-Length", "6"}], Options}
end,
Res = ibrowse:send_req(Url, Headers, Method, <<"">>,
[{stream_to, self()} | Options_1]),
case Res of
{ibrowse_req_id, Req_id} ->
Result = i_do_streaming_request2(Req_id, Options),
Parent ! {async_result, self(), Result};
Err ->
Parent ! {async_result, self(), Err}
end.
i_do_streaming_request2(Req_id, Options) ->
ibrowse:send_chunk(Req_id, <<"aaa">>),
ibrowse:send_chunk(Req_id, <<"bbb">>),
ibrowse:send_done(Req_id),
wait_for_async_resp(Req_id, Options, undefined, undefined, []).
execute_req(local_test_fun, Method, Args) ->
io:format(" ~-54.54w: ", [Method]),
Result = (catch apply(?MODULE, Method, Args)),
@ -623,3 +651,29 @@ do_trace(true, Fmt, Args) ->
io:format("~s -- " ++ Fmt, [ibrowse_lib:printable_date() | Args]);
do_trace(_, _, _) ->
ok.
test_put_request() ->
clear_msg_q(),
test_put_request("http://localhost:8181/ibrowse_put_request").
test_put_request(Url) ->
case do_async_req_list(Url, put, i_do_streaming_request,
[{stream_request, true}]) of
{ok, "204", _, _} ->
io:format(" Success!~n", []);
Err ->
io:format(" Fail: ~p~n", [Err])
end.
test_put_request_chunked() ->
clear_msg_q(),
test_put_request_chunked("http://localhost:8181/ibrowse_put_request").
test_put_request_chunked(Url) ->
case do_async_req_list(Url, put, i_do_streaming_request,
[chunked, {stream_request, true}]) of
{ok, "204", _, _} ->
io:format(" Success!~n", []);
Err ->
io:format(" Fail: ~p~n", [Err])
end.

+ 6
- 0
test/ibrowse_test_server.erl View File

@ -159,6 +159,12 @@ process_request(Sock, Sock_type,
uri = {abs_path, "/ibrowse_head_test"}}) ->
Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nTransfer-Encoding: chunked\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type,
#request{method='PUT',
headers = _Headers,
uri = {abs_path, "/ibrowse_put_request"}}) ->
Resp = <<"HTTP/1.1 204 No Content\r\nConnection: close\r\nContent-Length: 0\r\n\r\n">>,
do_send(Sock, Sock_type, Resp);
process_request(Sock, Sock_type, Req) ->
do_trace("Recvd req: ~p~n", [Req]),
Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,

Loading…
Cancel
Save