|
|
@ -61,6 +61,7 @@ |
|
|
|
stream_to, caller_controls_socket = false, |
|
|
|
caller_socket_options = [], |
|
|
|
req_id, |
|
|
|
stream_full_chunks = false, |
|
|
|
stream_chunk_size, |
|
|
|
save_response_to_file = false, |
|
|
|
tmp_file_name, tmp_file_fd, preserve_chunked_encoding, |
|
|
@ -909,6 +910,7 @@ send_req_1(From, |
|
|
|
options = Options, |
|
|
|
req_id = ReqId, |
|
|
|
save_response_to_file = SaveResponseToFile, |
|
|
|
stream_full_chunks = get_value(stream_full_chunks, Options, false), |
|
|
|
stream_chunk_size = get_stream_chunk_size(Options), |
|
|
|
response_format = Resp_format, |
|
|
|
from = From, |
|
|
@ -1451,22 +1453,41 @@ parse_11_response(DataRecvd, |
|
|
|
#state{transfer_encoding = chunked, |
|
|
|
chunk_size = CSz, |
|
|
|
recvd_chunk_size = Recvd_csz, |
|
|
|
rep_buf_size = RepBufSz} = State) -> |
|
|
|
reply_buffer = RepBuf, |
|
|
|
rep_buf_size = RepBufSz, |
|
|
|
streamed_size = Streamed_size, |
|
|
|
cur_req = CurReq} = State) -> |
|
|
|
NeedBytes = CSz - Recvd_csz, |
|
|
|
DataLen = size(DataRecvd), |
|
|
|
do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]), |
|
|
|
case DataLen >= NeedBytes of |
|
|
|
true -> |
|
|
|
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), |
|
|
|
do_trace("Recvd another chunk...~p~n", [RemChunk]), |
|
|
|
do_trace("RemData -> ~p~n", [RemData]), |
|
|
|
case accumulate_response(RemChunk, State) of |
|
|
|
{error, Reason} -> |
|
|
|
do_trace("Error accumulating response --> ~p~n", [Reason]), |
|
|
|
{error, Reason}; |
|
|
|
#state{} = State_1 -> |
|
|
|
State_2 = State_1#state{chunk_size=tbd}, |
|
|
|
parse_11_response(RemData, State_2) |
|
|
|
case CurReq of |
|
|
|
#request{stream_to = StreamTo, caller_controls_socket = false, req_id = ReqId, stream_full_chunks = true, response_format = Response_format} -> |
|
|
|
Chunk = <<RepBuf/binary, RemChunk/binary>>, |
|
|
|
do_trace("Recvd another chunk...~p~n", [Chunk]), |
|
|
|
do_trace("RemData -> ~p~n", [RemData]), |
|
|
|
do_interim_reply(StreamTo, Response_format, ReqId, Chunk), |
|
|
|
State_1 = State#state{ |
|
|
|
reply_buffer = <<>>, |
|
|
|
rep_buf_size = RepBufSz + size(RemChunk), |
|
|
|
interim_reply_sent = true, |
|
|
|
streamed_size = Streamed_size + CSz, |
|
|
|
chunk_size = tbd, |
|
|
|
recvd_chunk_size = 0}, |
|
|
|
parse_11_response(RemData, State_1); |
|
|
|
_ -> |
|
|
|
do_trace("Recvd another chunk...~p~n", [RemChunk]), |
|
|
|
do_trace("RemData -> ~p~n", [RemData]), |
|
|
|
case accumulate_response(RemChunk, State) of |
|
|
|
{error, Reason} -> |
|
|
|
do_trace("Error accumulating response --> ~p~n", [Reason]), |
|
|
|
{error, Reason}; |
|
|
|
#state{} = State_1 -> |
|
|
|
State_2 = State_1#state{chunk_size=tbd}, |
|
|
|
parse_11_response(RemData, State_2) |
|
|
|
end |
|
|
|
end; |
|
|
|
false -> |
|
|
|
accumulate_response(DataRecvd, |
|
|
@ -2065,11 +2086,16 @@ flatten([]) -> |
|
|
|
[]. |
|
|
|
|
|
|
|
get_stream_chunk_size(Options) -> |
|
|
|
case lists:keysearch(stream_chunk_size, 1, Options) of |
|
|
|
{value, {_, V}} when V > 0 -> |
|
|
|
V; |
|
|
|
case get_value(stream_full_chunks, Options, false) of |
|
|
|
true -> |
|
|
|
infinity; |
|
|
|
_ -> |
|
|
|
?DEFAULT_STREAM_CHUNK_SIZE |
|
|
|
case lists:keysearch(stream_chunk_size, 1, Options) of |
|
|
|
{value, {_, V}} when V > 0 -> |
|
|
|
V; |
|
|
|
_ -> |
|
|
|
?DEFAULT_STREAM_CHUNK_SIZE |
|
|
|
end |
|
|
|
end. |
|
|
|
|
|
|
|
set_inac_timer(State) -> |
|
|
|