Pārlūkot izejas kodu

Added stream_full_chunks option

With this option, the process given by stream_to will get a message with chunk data each fully received chunk; stream_chunk_size is set to infinity for this. This is useful for certain applications that stream live data with each unit being represented by a single chunk; one example is CouchDB's continuous changes feed.
pull/151/head
Patrick Schneider pirms 8 gadiem
vecāks
revīzija
fd3c3ad9bf
3 mainītis faili ar 42 papildinājumiem un 15 dzēšanām
  1. +1
    -1
      doc/ibrowse.html
  2. +1
    -0
      src/ibrowse.erl
  3. +40
    -14
      src/ibrowse_http_client.erl

+ 1
- 1
doc/ibrowse.html Parādīt failu

@ -208,7 +208,7 @@ send_req/4, send_req/5, send_req/6.

<h3 class="function"><a name="send_req-5">send_req/5</a></h3>
<div class="spec">
<p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -&gt; <a href="#type-response">response()</a></tt>
<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li><li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, boolean()} | {is_ssl, boolean()} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, boolean()} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, boolean()} | {give_raw_headers, boolean()} | {preserve_chunked_encoding, boolean()} | {workaround, head_response_with_body} | {worker_process_options, list()} | {return_raw_request, true} | {max_attempts, integer()}</tt></li><li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li><li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li><li><tt><a name="type-username">username()</a> = string()</tt></li><li><tt><a name="type-password">password()</a> = string()</tt></li><li><tt>SSLOpt = term()</tt></li><li><tt>Sock_opts = [Sock_opt]</tt></li><li><tt>Sock_opt = term()</tt></li><li><tt>ChunkSize = integer()</tt></li><li><tt><a name="type-srtf">srtf()</a> = boolean() | <a href="#type-filename">filename()</a> | {append, <a href="#type-filename">filename()</a>}</tt></li><li><tt><a name="type-filename">filename()</a> = string()</tt></li><li><tt><a name="type-response_format">response_format()</a> = list | binary</tt></li></ul></p>
<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li><li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_full_chunks, boolean()} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, boolean()} | {is_ssl, boolean()} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, boolean()} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, boolean()} | {give_raw_headers, boolean()} | {preserve_chunked_encoding, boolean()} | {workaround, head_response_with_body} | {worker_process_options, list()} | {return_raw_request, true} | {max_attempts, integer()}</tt></li><li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li><li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li><li><tt><a name="type-username">username()</a> = string()</tt></li><li><tt><a name="type-password">password()</a> = string()</tt></li><li><tt>SSLOpt = term()</tt></li><li><tt>Sock_opts = [Sock_opt]</tt></li><li><tt>Sock_opt = term()</tt></li><li><tt>ChunkSize = integer()</tt></li><li><tt><a name="type-srtf">srtf()</a> = boolean() | <a href="#type-filename">filename()</a> | {append, <a href="#type-filename">filename()</a>}</tt></li><li><tt><a name="type-filename">filename()</a> = string()</tt></li><li><tt><a name="type-response_format">response_format()</a> = list | binary</tt></li></ul></p>
</div><p>Same as send_req/4.</p>
<h3 class="function"><a name="send_req-6">send_req/6</a></h3>

+ 1
- 0
src/ibrowse.erl Parādīt failu

@ -272,6 +272,7 @@ send_req(Url, Headers, Method, Body) ->
%% optionList() = [option()]
%% option() = {max_sessions, integer()} |
%% {response_format,response_format()}|
%% {stream_full_chunks, boolean()} |
%% {stream_chunk_size, integer()} |
%% {max_pipeline_size, integer()} |
%% {trace, boolean()} |

+ 40
- 14
src/ibrowse_http_client.erl Parādīt failu

@ -61,6 +61,7 @@
stream_to, caller_controls_socket = false,
caller_socket_options = [],
req_id,
stream_full_chunks = false,
stream_chunk_size,
save_response_to_file = false,
tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
@ -909,6 +910,7 @@ send_req_1(From,
options = Options,
req_id = ReqId,
save_response_to_file = SaveResponseToFile,
stream_full_chunks = get_value(stream_full_chunks, Options, false),
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
from = From,
@ -1451,22 +1453,41 @@ parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
chunk_size = CSz,
recvd_chunk_size = Recvd_csz,
rep_buf_size = RepBufSz} = State) ->
reply_buffer = RepBuf,
rep_buf_size = RepBufSz,
streamed_size = Streamed_size,
cur_req = CurReq} = State) ->
NeedBytes = CSz - Recvd_csz,
DataLen = size(DataRecvd),
do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]),
case DataLen >= NeedBytes of
true ->
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]),
case accumulate_response(RemChunk, State) of
{error, Reason} ->
do_trace("Error accumulating response --> ~p~n", [Reason]),
{error, Reason};
#state{} = State_1 ->
State_2 = State_1#state{chunk_size=tbd},
parse_11_response(RemData, State_2)
case CurReq of
#request{stream_to = StreamTo, caller_controls_socket = false, req_id = ReqId, stream_full_chunks = true, response_format = Response_format} ->
Chunk = <<RepBuf/binary, RemChunk/binary>>,
do_trace("Recvd another chunk...~p~n", [Chunk]),
do_trace("RemData -> ~p~n", [RemData]),
do_interim_reply(StreamTo, Response_format, ReqId, Chunk),
State_1 = State#state{
reply_buffer = <<>>,
rep_buf_size = RepBufSz + size(RemChunk),
interim_reply_sent = true,
streamed_size = Streamed_size + CSz,
chunk_size = tbd,
recvd_chunk_size = 0},
parse_11_response(RemData, State_1);
_ ->
do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]),
case accumulate_response(RemChunk, State) of
{error, Reason} ->
do_trace("Error accumulating response --> ~p~n", [Reason]),
{error, Reason};
#state{} = State_1 ->
State_2 = State_1#state{chunk_size=tbd},
parse_11_response(RemData, State_2)
end
end;
false ->
accumulate_response(DataRecvd,
@ -2065,11 +2086,16 @@ flatten([]) ->
[].
get_stream_chunk_size(Options) ->
case lists:keysearch(stream_chunk_size, 1, Options) of
{value, {_, V}} when V > 0 ->
V;
case get_value(stream_full_chunks, Options, false) of
true ->
infinity;
_ ->
?DEFAULT_STREAM_CHUNK_SIZE
case lists:keysearch(stream_chunk_size, 1, Options) of
{value, {_, V}} when V > 0 ->
V;
_ ->
?DEFAULT_STREAM_CHUNK_SIZE
end
end.
set_inac_timer(State) ->

Notiek ielāde…
Atcelt
Saglabāt