Browse Source

add support for streaming request data

pull/92/head
Eugene Girshov 12 years ago
parent
commit
62cac1f13a
2 changed files with 71 additions and 4 deletions
  1. +20
    -0
      src/ibrowse.erl
  2. +51
    -4
      src/ibrowse_http_client.erl

+ 20
- 0
src/ibrowse.erl View File

@ -87,6 +87,8 @@
send_req_direct/6,
send_req_direct/7,
stream_next/1,
send_chunk/2,
send_done/1,
stream_close/1,
set_max_sessions/3,
set_max_pipeline_size/3,
@ -554,6 +556,24 @@ stream_next(Req_id) ->
ok
end.
send_chunk(Req_id, Data) ->
case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
[] ->
{error, unknown_req_id};
[{_, Pid}] ->
catch Pid ! {send_chunk, Req_id, Data},
ok
end.
send_done(Req_id) ->
case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
[] ->
{error, unknown_req_id};
[{_, Pid}] ->
catch Pid ! {send_done, Req_id},
ok
end.
%% @doc Tell ibrowse to close the connection associated with the
%% specified stream. Should be used in conjunction with the
%% <code>stream_to</code> option. Note that all requests in progress on

+ 51
- 4
src/ibrowse_http_client.erl View File

@ -252,6 +252,42 @@ handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
handle_info({send_chunk, Req_id, Data},
#state{cur_req = #request{req_id = Req_id}} = State) ->
Current_request = State#state.cur_req,
Options = Current_request#request.options,
TE = is_chunked_encoding_specified(Options),
case do_send(maybe_chunked_encode(Data, TE), State) of
ok ->
{noreply, State};
_ ->
ets:delete(ibrowse_stream, {req_id_pid, Req_id}),
shutting_down(State),
{stop, normal, State}
end;
handle_info({send_done, Req_id},
#state{cur_req = #request{req_id = Req_id}} = State) ->
Current_request = State#state.cur_req,
Options = Current_request#request.options,
TE = is_chunked_encoding_specified(Options),
case TE of
true -> do_send(<<"0\r\n\r\n">>, State);
_ -> ok
end,
State_2 = inc_pipeline_counter(State),
active_once(State_2),
State_3 = case State#state.status of
idle ->
do_trace("Request send completely, switch to get_header state~n", []),
State_2#state{status = get_header};
_ ->
State_2
end,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
handle_info(Info, State) ->
io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
[State#state.host, State#state.port, Info]),
@ -737,17 +773,21 @@ send_req_1(From,
ReqId = make_req_id(),
Resp_format = get_value(response_format, Options, list),
Caller_socket_options = get_value(socket_options, Options, []),
Streaming_request = is_streaming_request(Options),
Async_pid_rec = {{req_id_pid, ReqId}, self()},
{StreamTo, Caller_controls_socket} =
case get_value(stream_to, Options, undefined) of
{Caller, once} when is_pid(Caller) or
is_atom(Caller) ->
Async_pid_rec = {{req_id_pid, ReqId}, self()},
true = ets:insert(ibrowse_stream, Async_pid_rec),
{Caller, true};
undefined ->
{undefined, false};
Caller when is_pid(Caller) or
is_atom(Caller) ->
case Streaming_request of true ->
true = ets:insert(ibrowse_stream, Async_pid_rec)
end,
{Caller, false};
Stream_to_inv ->
exit({invalid_option, {stream_to, Stream_to_inv}})
@ -782,8 +822,12 @@ send_req_1(From,
trace_request(Req),
do_setopts(Socket, Caller_socket_options, State_1),
TE = is_chunked_encoding_specified(Options),
case do_send(Req, State_1) of
ok ->
case {do_send(Req, State_1), Streaming_request} of
{ok, true} ->
State_2 = State_1#state{cur_req = NewReq},
do_trace("dont send request body immediately, return reqid~n", []),
{reply, {ibrowse_req_id, ReqId}, State_2};
{ok, false} ->
case do_send_body(Body_1, State_1, TE) of
ok ->
trace_request_body(Body_1),
@ -804,7 +848,7 @@ send_req_1(From,
end,
State_4 = set_inac_timer(State_3),
{noreply, State_4};
Err ->
{Err, _} ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, {send_failed, Err}}),
@ -937,6 +981,9 @@ is_chunked_encoding_specified(Options) ->
true
end.
is_streaming_request(Options) ->
get_value(stream_request, Options, false).
http_vsn_string({0,9}) -> "HTTP/0.9";
http_vsn_string({1,0}) -> "HTTP/1.0";
http_vsn_string({1,1}) -> "HTTP/1.1".

Loading…
Cancel
Save