浏览代码

Added option {stream_to, {process(), once}} to allow calling process to control data rate on socket

pull/16/head
Chandrashekhar Mullaparthi 16 年前
父节点
当前提交
2768725f81
共有 10 个文件被更改,包括 430 次插入335 次删除
  1. +1
    -1
      LICENSE
  2. +89
    -82
      README
  3. +24
    -5
      doc/ibrowse.html
  4. +1
    -1
      ebin/ibrowse.app
  5. +33
    -19
      src/ibrowse.erl
  6. +224
    -217
      src/ibrowse_http_client.erl
  7. +7
    -1
      src/ibrowse_lb.erl
  8. +6
    -6
      src/ibrowse_lib.erl
  9. +44
    -2
      src/ibrowse_test.erl
  10. +1
    -1
      vsn.mk

+ 1
- 1
LICENSE 查看文件

@ -1,5 +1,5 @@
ibrowse - a HTTP client written in erlang
Copyright (C) 2005 Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
Copyright (C) 2005-2009 Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.

+ 89
- 82
README 查看文件

@ -1,18 +1,18 @@
ibrowse is a HTTP client. The following are a list of features.
- RFC2616 compliant (AFAIK)
- supports GET, POST, OPTIONS, HEAD, PUT, DELETE, TRACE,
- RFC2616 compliant (AFAIK)
- supports GET, POST, OPTIONS, HEAD, PUT, DELETE, TRACE,
MKCOL, PROPFIND, PROPPATCH, LOCK, UNLOCK, MOVE and COPY
- Understands HTTP/0.9, HTTP/1.0 and HTTP/1.1
- Understands chunked encoding
- Understands HTTP/0.9, HTTP/1.0 and HTTP/1.1
- Understands chunked encoding
- Can generate requests using Chunked Transfer-Encoding
- Pools of connections to each webserver
- Pipelining support
- Download to file
- Asynchronous requests. Responses are streamed to a process
- Basic authentication
- Supports proxy authentication
- Can talk to Secure webservers using SSL
- any other features in the code not listed here :)
- Pools of connections to each webserver
- Pipelining support
- Download to file
- Asynchronous requests. Responses are streamed to a process
- Basic authentication
- Supports proxy authentication
- Can talk to Secure webservers using SSL
- any other features in the code not listed here :)
ibrowse is available under two different licenses. LGPL and the BSD license.
@ -24,24 +24,31 @@ Latest version : git://github.com/cmullaparthi/ibrowse.git
CONTRIBUTIONS & CHANGE HISTORY
==============================
29-06-2009 - * Fixed following issues reported by Oscar Hellström
03-07-2009 - Added option {stream_to, {Pid, once}} which allows the caller
to control when it wants to receive more data. If this option
is used, the call ibrowse:stream_next(Req_id) should be used
to get more data.
- Patch submitted by Steve Vinoski to remove compiler warnings
about the use of obsolete guards
29-06-2009 - * Fixed following issues reported by Oscar Hellström
- Use {active, once} instead of {active, true}
- Fix 'dodgy' timeout handling
- Use binaries internally instead of lists to reduce memory
- Fix 'dodgy' timeout handling
- Use binaries internally instead of lists to reduce memory
consumption on 64 bit platforms. The default response format
is still 'list' to maintain backwards compatibility. Use the
option {response_format, binary} to get responses as binaries.
* Fixed chunking bug (reported by Adam Kocoloski)
* Added new option {inactivity_timeout, Milliseconds} to timeout
* Fixed chunking bug (reported by Adam Kocoloski)
* Added new option {inactivity_timeout, Milliseconds} to timeout
requests if no data is received on the link for the specified
interval. Useful when responses are large and links are flaky.
* Added ibrowse:all_trace_off/0 to turn off all tracing
* Change to the way responses to asynchronous requests are
* Added ibrowse:all_trace_off/0 to turn off all tracing
* Change to the way responses to asynchronous requests are
returned. The following messages have been removed.
* {ibrowse_async_response, Req_id, {chunk_start, Chunk_size}}
* {ibrowse_async_response, Req_id, chunk_end}
* Fixed Makefiles as part of Debian packaging
(thanks to Thomas Lindgren)
* {ibrowse_async_response, Req_id, chunk_end}
* Fixed Makefiles as part of Debian packaging
(thanks to Thomas Lindgren)
* Moved repository from Sourceforge to Github
11-06-2009 - * Added option to control size of streamed chunks. Also added
@ -83,7 +90,7 @@ CONTRIBUTIONS & CHANGE HISTORY
17-10-2007 - Matthew Reilly (matthew dot reilly _at_ sipphone dot com)
sent a bug report and a fix. If the chunk trailer spans two TCP
packets, then ibrowse fails to recognise that the chunked transfer
has ended.
has ended.
29-08-2007 - Bug report by Peter Kristensen(ptx _at_ daimi dot au dot dk).
ibrowse crashes when the webserver returns just the Status line
@ -104,7 +111,7 @@ CONTRIBUTIONS & CHANGE HISTORY
12-01-2007 - Derek Upham sent in a bug fix. The reset_state function was not
behaving correctly when the transfer encoding was not chunked.
13-11-2006 - Youn�s Hafri reported a bug where ibrowse was not returning the
13-11-2006 - Youn�s Hafri reported a bug where ibrowse was not returning the
temporary filename when the server was closing the connection
after sending the data (as in HTTP/1.0).
Released ibrowse under the BSD license
@ -123,8 +130,8 @@ CONTRIBUTIONS & CHANGE HISTORY
22-Nov-2005 - Added ability to generate requests using the Chunked
Transfer-Encoding.
08-May-2005 - Youn�s Hafri made a CRUX LINUX port of ibrowse.
http://yhafri.club.fr/crux/index.html
08-May-2005 - Youn�s Hafri made a CRUX LINUX port of ibrowse.
http://yhafri.club.fr/crux/index.html
Here are some usage examples. Enjoy!
@ -147,10 +154,10 @@ Here are some usage examples. Enjoy!
%% =============================================================================
%% A GET using a proxy
7> ibrowse:send_req("http://www.google.com/", [], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080}], 1000).
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080}], 1000).
{ok,"302",
[{"Date","Fri, 17 Dec 2004 15:22:56 GMT"},
{"Content-Length","217"},
@ -170,20 +177,20 @@ Here are some usage examples. Enjoy!
%% be set using the application env var 'download_dir' - the default
%% is the current working directory.
8> ibrowse:send_req("http://www.erlang.se/", [], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{save_response_to_file, true}], 1000).
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{save_response_to_file, true}], 1000).
{error,req_timedout}
%% =============================================================================
9> ibrowse:send_req("http://www.erlang.se/", [], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{save_response_to_file, true}], 5000).
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{save_response_to_file, true}], 5000).
{ok,"200",
[{"Transfer-Encoding","chunked"},
{"Date","Fri, 17 Dec 2004 15:24:36 GMT"},
@ -197,7 +204,7 @@ Here are some usage examples. Enjoy!
%% number of maximum connections to this server to 10 and the pipeline
%% size to 1. Connections are setup a required.
11> ibrowse:set_dest("www.hotmail.com", 80, [{max_sessions, 10},
{max_pipeline_size, 1}]).
{max_pipeline_size, 1}]).
ok
%% =============================================================================
@ -231,51 +238,51 @@ ok
%% =============================================================================
%% Example of using Asynchronous requests
18> ibrowse:send_req("http://www.google.com", [], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{stream_to, self()}]).
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{stream_to, self()}]).
{ibrowse_req_id,{1115,327256,389608}}
19> flush().
Shell got {ibrowse_async_headers,{1115,327256,389608},
"302",
[{"Date","Thu, 05 May 2005 21:06:41 GMT"},
{"Content-Length","217"},
{"Content-Type","text/html"},
{"Set-Cookie",
"PREF=ID=b601f16bfa32f071:CR=1:TM=1115327201:LM=1115327201:S=OX5hSB525AMjUUu7; expires=Sun, 17-Jan-2038 19:14:07 GMT; path=/; domain=.google.com"},
{"Server","GWS/2.1"},
{"Location",
"http://www.google.co.uk/cxfer?c=PREF%3D:TM%3D1115327201:S%3DDS9pDJ4IHcAuZ_AS&prev=/"},
{"Via",
"1.1 hatproxy01 (NetCache NetApp/5.6.2)"}]}
"302",
[{"Date","Thu, 05 May 2005 21:06:41 GMT"},
{"Content-Length","217"},
{"Content-Type","text/html"},
{"Set-Cookie",
"PREF=ID=b601f16bfa32f071:CR=1:TM=1115327201:LM=1115327201:S=OX5hSB525AMjUUu7; expires=Sun, 17-Jan-2038 19:14:07 GMT; path=/; domain=.google.com"},
{"Server","GWS/2.1"},
{"Location",
"http://www.google.co.uk/cxfer?c=PREF%3D:TM%3D1115327201:S%3DDS9pDJ4IHcAuZ_AS&prev=/"},
{"Via",
"1.1 hatproxy01 (NetCache NetApp/5.6.2)"}]}
Shell got {ibrowse_async_response,{1115,327256,389608},
"<HTML><HEAD><TITLE>302 Moved</TITLE></HEAD><BODY>\n<H1>302 Moved</H1>\nThe document has moved\n<A HREF=\"http://www.google.co.uk/cxfer?c=PREF%3D:TM%3D1115327201:S%3DDS9pDJ4IHcAuZ_AS&amp;prev=/\">here</A>.\r\n</BODY></HTML>\r\n"}
"<HTML><HEAD><TITLE>302 Moved</TITLE></HEAD><BODY>\n<H1>302 Moved</H1>\nThe document has moved\n<A HREF=\"http://www.google.co.uk/cxfer?c=PREF%3D:TM%3D1115327201:S%3DDS9pDJ4IHcAuZ_AS&amp;prev=/\">here</A>.\r\n</BODY></HTML>\r\n"}
Shell got {ibrowse_async_response_end,{1115,327256,389608}}
ok
%% =============================================================================
%% Another example of using async requests
24> ibrowse:send_req("http://yaws.hyber.org/simple_ex2.yaws", [], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{stream_to, self()}]).
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{stream_to, self()}]).
{ibrowse_req_id,{1115,327430,512314}}
25> flush().
Shell got {ibrowse_async_headers,{1115,327430,512314},
"200",
[{"Date","Thu, 05 May 2005 20:58:08 GMT"},
{"Content-Length","64"},
{"Content-Type","text/html;charset="},
{"Server",
"Yaws/1.54 Yet Another Web Server"},
{"Via",
"1.1 hatproxy01 (NetCache NetApp/5.6.2)"}]}
"200",
[{"Date","Thu, 05 May 2005 20:58:08 GMT"},
{"Content-Length","64"},
{"Content-Type","text/html;charset="},
{"Server",
"Yaws/1.54 Yet Another Web Server"},
{"Via",
"1.1 hatproxy01 (NetCache NetApp/5.6.2)"}]}
Shell got {ibrowse_async_response,{1115,327430,512314},
"<html>\n\n\n<h1> Yesssssss </h1>\n\n<h2> Hello again </h2>\n\n\n</html>\n"}
"<html>\n\n\n<h1> Yesssssss </h1>\n\n<h2> Hello again </h2>\n\n\n</html>\n"}
Shell got {ibrowse_async_response_end,{1115,327430,512314}}
%% =============================================================================
@ -287,12 +294,12 @@ Shell got {ibrowse_async_response_end,{1115,327430,512314}}
%% Example of request using both Proxy-Authorization and authorization by the final webserver.
17> ibrowse:send_req("http://www.erlang.se/lic_area/protected/patches/erl_756_otp_beam.README",
[], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{basic_auth, {"XXXXX", "XXXXXX"}}]).
[], get, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080},
{basic_auth, {"XXXXX", "XXXXXX"}}]).
{ok,"200",
[{"Accept-Ranges","bytes"},
{"Date","Thu, 05 May 2005 21:02:09 GMT"},
@ -309,10 +316,10 @@ Shell got {ibrowse_async_response_end,{1115,327430,512314}}
%% support this. Nor did www.google.com. But good old BBC supports
%% this.
35> 37> ibrowse:send_req("http://www.bbc.co.uk/", [], trace, [],
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080}]).
[{proxy_user, "XXXXX"},
{proxy_password, "XXXXX"},
{proxy_host, "proxy"},
{proxy_port, 8080}]).
{ok,"200",
[{"Transfer-Encoding","chunked"},
{"Date","Thu, 05 May 2005 21:40:27 GMT"},

+ 24
- 5
doc/ibrowse.html 查看文件

@ -12,7 +12,7 @@
<ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client.
<p>Copyright © 2005-2009 Chandrashekhar Mullaparthi</p>
<p><b>Version:</b> 1.5.0</p>
<p><b>Version:</b> 1.5.1</p>
<p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p>
<p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p>
@ -101,6 +101,8 @@ send_req/4, send_req/5, send_req/6.

<tr><td valign="top"><a href="#stop-0">stop/0</a></td><td>Stop the ibrowse process.</td></tr>
<tr><td valign="top"><a href="#stop_worker_process-1">stop_worker_process/1</a></td><td>Terminate a worker process spawned using
spawn_worker_process/2 or spawn_link_worker_process/2.</td></tr>
<tr><td valign="top"><a href="#stream_next-1">stream_next/1</a></td><td>Tell ibrowse to stream the next chunk of data to the
caller.</td></tr>
<tr><td valign="top"><a href="#terminate-2">terminate/2</a></td><td></td></tr>
<tr><td valign="top"><a href="#trace_off-0">trace_off/0</a></td><td>Turn tracing off for the ibrowse process.</td></tr>
<tr><td valign="top"><a href="#trace_off-2">trace_off/2</a></td><td>Turn tracing OFF for all connections to the specified HTTP
@ -177,7 +179,8 @@ send_req/4, send_req/5, send_req/6.

<li><tt><a name="type-respHeader">respHeader()</a> = {<a href="#type-headerName">headerName()</a>, <a href="#type-headerValue">headerValue()</a>}</tt></li>
<li><tt><a name="type-headerName">headerName()</a> = string()</tt></li>
<li><tt><a name="type-headerValue">headerValue()</a> = string()</tt></li>
<li><tt><a name="type-response">response()</a> = {ok, Status, ResponseHeaders, ResponseBody} | {error, Reason}</tt></li>
<li><tt><a name="type-response">response()</a> = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, <a href="#type-req_id">req_id()</a>} | {error, Reason}</tt></li>
<li><tt><a name="type-req_id">req_id()</a> = term()</tt></li>
<li><tt>ResponseBody = string() | {file, Filename}</tt></li>
<li><tt>Reason = term()</tt></li>
</ul></p>
@ -199,7 +202,8 @@ send_req/4, send_req/5, send_req/6.

<div class="spec">
<p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -&gt; <a href="#type-response">response()</a></tt>
<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li>
<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-process">process()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {transfer_encoding, {chunked, ChunkSize}}</tt></li>
<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {transfer_encoding, {chunked, ChunkSize}}</tt></li>
<li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li>
<li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li>
<li><tt><a name="type-username">username()</a> = string()</tt></li>
<li><tt><a name="type-password">password()</a> = string()</tt></li>
@ -210,7 +214,7 @@ send_req/4, send_req/5, send_req/6.

<li><tt><a name="type-response_format">response_format()</a> = list | binary</tt></li>
</ul></p>
</div><p>Same as send_req/4.
For a description of SSL Options, look in the ssl manpage. If the
For a description of SSL Options, look in the <a href="http://www.erlang.org/doc/apps/ssl/index.html">ssl</a> manpage. If the
HTTP Version to use is not specified, the default is 1.1.
<br>
<p>The <code>host_header</code> option is useful in the case where ibrowse is
@ -221,6 +225,14 @@ send_req/4, send_req/5, send_req/6.

used to specify what should go in the <code>Host</code> header in
the request.</p>
<ul>
<li>The <code>stream_to</code> option can be used to have the HTTP
response streamed to a process as messages as data arrives on the
socket. If the calling process wishes to control the rate at which
data is received from the server, the option <code>{stream_to,
{process(), once}}</code> can be specified. The calling process
will have to invoke <code>ibrowse:stream_next(Request_id)</code> to
receive the next packet.</li>
<li>When both the options <code>save_response_to_file</code> and <code>stream_to</code>
are specified, the former takes precedence.</li>
@ -360,6 +372,13 @@ send_req/4, send_req/5, send_req/6.

spawn_worker_process/2 or spawn_link_worker_process/2. Requests in
progress will get the error response <pre>{error, closing_on_request}</pre></p>
<h3 class="function"><a name="stream_next-1">stream_next/1</a></h3>
<div class="spec">
<p><tt>stream_next(Req_id::<a href="#type-req_id">req_id()</a>) -&gt; ok | {error, unknown_req_id}</tt></p>
</div><p>Tell ibrowse to stream the next chunk of data to the
caller. Should be used in conjunction with the
<code>stream_to</code> option</p>
<h3 class="function"><a name="terminate-2">terminate/2</a></h3>
<div class="spec">
<p><tt>terminate(Reason, State) -&gt; any()</tt></p>
@ -392,6 +411,6 @@ send_req/4, send_req/5, send_req/6.

<hr>
<div class="navbar"><a name="#navbar_bottom"></a><table width="100%" border="0" cellspacing="0" cellpadding="2" summary="navigation bar"><tr><td><a href="overview-summary.html" target="overviewFrame">Overview</a></td><td><a href="http://www.erlang.org/"><img src="erlang.png" align="right" border="0" alt="erlang logo"></a></td></tr></table></div>
<p><i>Generated by EDoc, Jun 30 2009, 23:44:01.</i></p>
<p><i>Generated by EDoc, Jul 7 2009, 23:13:24.</i></p>
</body>
</html>

+ 1
- 1
ebin/ibrowse.app 查看文件

@ -1,6 +1,6 @@
{application, ibrowse,
[{description, "HTTP client application"},
{vsn, "1.5.0"},
{vsn, "1.5.1"},
{modules, [ ibrowse,
ibrowse_http_client,
ibrowse_app,

+ 33
- 19
src/ibrowse.erl 查看文件

@ -7,7 +7,7 @@
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2009 Chandrashekhar Mullaparthi
%% @version 1.5.0
%% @version 1.5.1
%% @doc The ibrowse application implements an HTTP 1.1 client. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@ -89,6 +89,7 @@
send_req_direct/5,
send_req_direct/6,
send_req_direct/7,
stream_next/1,
set_max_sessions/3,
set_max_pipeline_size/3,
set_dest/3,
@ -150,7 +151,8 @@ stop() ->
%% respHeader() = {headerName(), headerValue()}
%% headerName() = string()
%% headerValue() = string()
%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {error, Reason}
%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
%% req_id() = term()
%% ResponseBody = string() | {file, Filename}
%% Reason = term()
send_req(Url, Headers, Method) ->
@ -167,7 +169,7 @@ send_req(Url, Headers, Method, Body) ->
send_req(Url, Headers, Method, Body, []).
%% @doc Same as send_req/4.
%% For a description of SSL Options, look in the ssl manpage. If the
%% For a description of SSL Options, look in the <a href="http://www.erlang.org/doc/apps/ssl/index.html">ssl</a> manpage. If the
%% HTTP Version to use is not specified, the default is 1.1.
%% <br/>
%% <p>The <code>host_header</code> option is useful in the case where ibrowse is
@ -179,6 +181,14 @@ send_req(Url, Headers, Method, Body) ->
%% used to specify what should go in the <code>Host</code> header in
%% the request.</p>
%% <ul>
%% <li>The <code>stream_to</code> option can be used to have the HTTP
%% response streamed to a process as messages as data arrives on the
%% socket. If the calling process wishes to control the rate at which
%% data is received from the server, the option <code>{stream_to,
%% {process(), once}}</code> can be specified. The calling process
%% will have to invoke <code>ibrowse:stream_next(Request_id)</code> to
%% receive the next packet.</li>
%%
%% <li>When both the options <code>save_response_to_file</code> and <code>stream_to</code>
%% are specified, the former takes precedence.</li>
%%
@ -237,13 +247,14 @@ send_req(Url, Headers, Method, Body) ->
%% {content_length, integer()} |
%% {content_type, string()} |
%% {save_response_to_file, srtf()} |
%% {stream_to, process()} |
%% {stream_to, stream_to()} |
%% {http_vsn, {MajorVsn, MinorVsn}} |
%% {host_header, string()} |
%% {inactivity_timeout, integer()} |
%% {connect_timeout, integer()} |
%% {transfer_encoding, {chunked, ChunkSize}}
%%
%% stream_to() = process() | {process(), once}
%% process() = pid() | atom()
%% username() = string()
%% password() = string()
@ -425,7 +436,20 @@ send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) ->
Err ->
{error, {url_parsing_failed, Err}}
end.
%% @doc Tell ibrowse to stream the next chunk of data to the
%% caller. Should be used in conjunction with the
%% <code>stream_to</code> option
%% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
stream_next(Req_id) ->
case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
[] ->
{error, unknown_req_id};
[{_, Pid}] ->
catch Pid ! {stream_next, Req_id},
ok
end.
%% @doc Turn tracing on for the ibrowse process
trace_on() ->
ibrowse ! {trace, true}.
@ -522,6 +546,7 @@ init(_) ->
put(ibrowse_trace_token, "ibrowse"),
ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
ets:new(ibrowse_stream, [named_table, public]),
import_config(),
{ok, #state{}}.
@ -539,9 +564,9 @@ import_config(Filename) ->
{ok, Terms} ->
ets:delete_all_objects(ibrowse_conf),
Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
when list(Host), integer(Port),
integer(MaxSess), MaxSess > 0,
integer(MaxPipe), MaxPipe > 0, list(Options) ->
when is_list(Host), is_integer(Port),
is_integer(MaxSess), MaxSess > 0,
is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
I = [{{max_sessions, Host, Port}, MaxSess},
{{max_pipeline_size, Host, Port}, MaxPipe},
{{options, Host, Port}, Options}],
@ -641,13 +666,6 @@ handle_info(all_trace_off, State) ->
true ->
catch Pid ! {trace, false}
end;
(#client_conn{key = {H, P, Pid}}, _) ->
case lists:member({H, P}, Trace_on_dests) of
false ->
ok;
true ->
catch Pid ! {trace, false}
end;
(_, Acc) ->
Acc
end,
@ -661,10 +679,6 @@ handle_info({trace, Bool}, State) ->
handle_info({trace, Bool, Host, Port}, State) ->
Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _)
when H == Host,
P == Port ->
catch Pid ! {trace, Bool};
(#client_conn{key = {H, P, Pid}}, _)
when H == Host,
P == Port ->
catch Pid ! {trace, Bool};

+ 224
- 217
src/ibrowse_http_client.erl 查看文件

@ -47,11 +47,12 @@
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
lb_ets_tid, cur_pipeline_size = 0
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}).
-record(request, {url, method, options, from,
stream_to, req_id,
stream_to, caller_controls_socket = false,
req_id,
stream_chunk_size,
save_response_to_file = false,
tmp_file_name, tmp_file_fd,
@ -126,144 +127,15 @@ init({Host, Port}) ->
%%--------------------------------------------------------------------
%% Received a request when the remote server has already sent us a
%% Connection: Close header
handle_call({send_req, _},
_From,
#state{is_closing=true}=State) ->
handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
{reply, {error, connection_closing}, State};
handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From,
#state{socket=undefined,
host=Host, port=Port}=State) ->
Resp_format = get_value(response_format, Options, list),
{Host_1, Port_1, State_1} =
case get_value(proxy_host, Options, false) of
false ->
{Host, Port, State};
PHost ->
ProxyUser = get_value(proxy_user, Options, []),
ProxyPassword = get_value(proxy_password, Options, []),
Digest = http_auth_digest(ProxyUser, ProxyPassword),
{PHost, get_value(proxy_port, Options, 80),
State#state{use_proxy = true,
proxy_auth_digest = Digest}}
end,
StreamTo = get_value(stream_to, Options, undefined),
ReqId = make_req_id(),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
NewReq = #request{url=Url,
method=Method,
stream_to=StreamTo,
options=Options,
req_id=ReqId,
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
from=From},
Reqs = queue:in(NewReq, State#state.reqs),
State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
do_trace("Connecting...~n", []),
Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
do_trace("Connected!~n", []),
End_ts = now(),
Ref = case Timeout of
infinity ->
undefined;
_ ->
Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)),
case Rem_time > 0 of
true ->
erlang:send_after(Rem_time, self(), {req_timedout, From});
false ->
shutting_down(State_2),
do_error_reply(State_2, req_timedout),
exit(normal)
end
end,
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of
ok ->
do_setopts(Sock, [{active, once}], State_2#state.is_ssl),
case StreamTo of
undefined ->
ok;
_ ->
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
State_3 = inc_pipeline_counter(State_2#state{socket = Sock,
send_timer = Ref,
cur_req = NewReq,
status = get_header}),
{noreply, State_3, get_inac_timeout(State_3)};
Err ->
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
{stop, normal, State_2}
end;
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
gen_server:reply(From, {error, conn_failed}),
{stop, normal, State_2}
end;
%% Request which is to be pipelined
handle_call({send_req, {Url, Headers, Method,
Body, Options, Timeout}},
From,
#state{socket=Sock, status=Status, reqs=Reqs}=State) ->
do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]),
Resp_format = get_value(response_format, Options, list),
StreamTo = get_value(stream_to, Options, undefined),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
ReqId = make_req_id(),
NewReq = #request{url=Url,
stream_to=StreamTo,
method=Method,
options=Options,
req_id=ReqId,
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
from=From},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
ok ->
State_2 = inc_pipeline_counter(State_1),
do_setopts(Sock, [{active, once}], State#state.is_ssl),
case Timeout of
infinity ->
ok;
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
State_3 = case Status of
idle ->
State_2#state{status = get_header,
cur_req = NewReq};
_ ->
State_2
end,
case StreamTo of
undefined ->
ok;
_ ->
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
{noreply, State_3, get_inac_timeout(State_3)};
Err ->
shutting_down(State_1),
do_trace("Send request failed: Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
do_error_reply(State, send_failed),
{stop, normal, State_1}
end;
From, State) ->
send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
handle_call(stop, _From, #state{socket = Socket, is_ssl = Is_ssl} = State) ->
do_close(Socket, Is_ssl),
handle_call(stop, _From, State) ->
do_close(State),
do_error_reply(State, closing_on_request),
{stop, normal, State};
@ -294,6 +166,15 @@ handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
handle_info({ssl, _Sock, Data}, State) ->
handle_sock_data(Data, State);
handle_info({stream_next, Req_id}, #state{socket = Socket,
is_ssl = Is_ssl,
cur_req = #request{req_id = Req_id}} = State) ->
do_setopts(Socket, [{active, once}], Is_ssl),
{noreply, State};
handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
@ -332,12 +213,7 @@ handle_info(Info, State) ->
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
case State#state.socket of
undefined ->
ok;
Sock ->
do_close(Sock, State#state.is_ssl)
end.
do_close(State).
%%--------------------------------------------------------------------
%% Func: code_change/3
@ -358,10 +234,10 @@ handle_sock_data(Data, #state{status=idle}=State) ->
do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
shutting_down(State),
do_error_reply(State, data_in_status_idle),
do_close(State#state.socket, State#state.is_ssl),
do_close(State),
{stop, normal, State};
handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
handle_sock_data(Data, #state{status = get_header}=State) ->
case parse_response(Data, State) of
{error, _Reason} ->
shutting_down(State),
@ -370,14 +246,15 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
shutting_down(State),
{stop, normal, State};
State_1 ->
do_setopts(Sock, [{active, once}], State#state.is_ssl),
active_once(State_1),
{noreply, State_1, get_inac_timeout(State_1)}
end;
handle_sock_data(Data, #state{status=get_body, content_length=CL,
handle_sock_data(Data, #state{status = get_body,
content_length = CL,
http_status_code = StatCode,
recvd_headers=Headers,
chunk_size=CSz, socket=Sock}=State) ->
recvd_headers = Headers,
chunk_size = CSz} = State) ->
case (CL == undefined) and (CSz == undefined) of
true ->
case accumulate_response(Data, State) of
@ -387,7 +264,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
State_1 ->
do_setopts(Sock, [{active, once}], State#state.is_ssl),
active_once(State_1),
{noreply, State_1, get_inac_timeout(State_1)}
end;
_ ->
@ -401,7 +278,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
shutting_down(State),
{stop, normal, State};
State_1 ->
do_setopts(Sock, [{active, once}], State#state.is_ssl),
active_once(State_1),
{noreply, State_1, get_inac_timeout(State_1)}
end
end.
@ -452,22 +329,27 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
cur_req = CurReq}=State) ->
#request{stream_to=StreamTo, req_id=ReqId,
stream_chunk_size = Stream_chunk_size,
response_format = Response_format} = CurReq,
response_format = Response_format,
caller_controls_socket = Caller_controls_socket} = CurReq,
RepBuf_1 = concat_binary([RepBuf, Data]),
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
State#state{reply_buffer = RepBuf_1};
_ when New_data_size < Stream_chunk_size ->
State#state{reply_buffer = RepBuf_1};
_ ->
_ when Caller_controls_socket == true ->
do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
State#state{reply_buffer = <<>>,
streamed_size = Streamed_size + size(RepBuf_1)};
_ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
accumulate_response(
Rem_data,
State#state{
reply_buffer = <<>>,
streamed_size = Streamed_size + Stream_chunk_size})
streamed_size = Streamed_size + Stream_chunk_size});
_ ->
State#state{reply_buffer = RepBuf_1}
end.
make_tmp_filename() ->
@ -528,37 +410,45 @@ do_connect(Host, Port, _Options, _State, Timeout) ->
[binary, {nodelay, true}, {active, false}],
Timeout).
do_send(Sock, Req, true) -> ssl:send(Sock, Req);
do_send(Sock, Req, false) -> gen_tcp:send(Sock, Req).
do_send(Req, #state{socket = Sock, is_ssl = true}) -> ssl:send(Sock, Req);
do_send(Req, #state{socket = Sock, is_ssl = false}) -> gen_tcp:send(Sock, Req).
%% @spec do_send_body(Sock::socket_descriptor(), Source::source_descriptor(), IsSSL::boolean()) -> ok | error()
%% source_descriptor() = fun_arity_0 |
%% {fun_arity_0} |
%% {fun_arity_1, term()}
%% error() = term()
do_send_body(Sock, Source, IsSSL) when is_function(Source) ->
do_send_body(Sock, {Source}, IsSSL);
do_send_body(Sock, {Source}, IsSSL) when is_function(Source) ->
do_send_body1(Sock, Source, IsSSL, Source());
do_send_body(Sock, {Source, State}, IsSSL) when is_function(Source) ->
do_send_body1(Sock, Source, IsSSL, Source(State));
do_send_body(Sock, Body, IsSSL) ->
do_send(Sock, Body, IsSSL).
do_send_body1(Sock, Source, IsSSL, Resp) ->
do_send_body(Source, State) when is_function(Source) ->
do_send_body({Source}, State);
do_send_body({Source}, State) when is_function(Source) ->
do_send_body1(Source, Source(), State);
do_send_body({Source, Source_state}, State) when is_function(Source) ->
do_send_body1(Source, Source(Source_state), State);
do_send_body(Body, State) ->
do_send(Body, State).
do_send_body1(Source, Resp, State) ->
case Resp of
{ok, Data} ->
do_send(Sock, Data, IsSSL),
do_send_body(Sock, {Source}, IsSSL);
{ok, Data, NewState} ->
do_send(Sock, Data, IsSSL),
do_send_body(Sock, {Source, NewState}, IsSSL);
eof -> ok;
Err -> Err
do_send(Data, State),
do_send_body({Source}, State);
{ok, Data, New_source_state} ->
do_send(Data, State),
do_send_body({Source, New_source_state}, State);
eof ->
ok;
Err ->
Err
end.
do_close(Sock, true) -> ssl:close(Sock);
do_close(Sock, false) -> gen_tcp:close(Sock).
do_close(#state{socket = undefined}) -> ok;
do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
ok;
active_once(#state{socket = Socket, is_ssl = Is_ssl}) ->
do_setopts(Socket, [{active, once}], Is_ssl).
do_setopts(Sock, Opts, true) -> ssl:setopts(Sock, Opts);
do_setopts(Sock, Opts, false) -> inet:setopts(Sock, Opts).
@ -571,11 +461,81 @@ check_ssl_options(Options, State) ->
State#state{is_ssl=true, ssl_options=get_value(ssl_options, Options)}
end.
send_req_1(#url{abspath = AbsPath,
host = Host,
port = Port,
path = RelPath} = Url,
Headers, Method, Body, Options, Sock, State) ->
send_req_1(From,
#url{host = Host,
port = Port} = Url,
Headers, Method, Body, Options, Timeout,
#state{socket = undefined} = State) ->
{Host_1, Port_1, State_1} =
case get_value(proxy_host, Options, false) of
false ->
{Host, Port, State};
PHost ->
ProxyUser = get_value(proxy_user, Options, []),
ProxyPassword = get_value(proxy_password, Options, []),
Digest = http_auth_digest(ProxyUser, ProxyPassword),
{PHost, get_value(proxy_port, Options, 80),
State#state{use_proxy = true,
proxy_auth_digest = Digest}}
end,
State_2 = check_ssl_options(Options, State_1),
do_trace("Connecting...~n", []),
Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
do_trace("Connected!~n", []),
End_ts = now(),
Timeout_1 = case Timeout of
infinity ->
infinity;
_ ->
Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
end,
State_3 = State_2#state{socket = Sock},
send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
gen_server:reply(From, {error, conn_failed}),
{stop, normal, State_2}
end;
send_req_1(From,
#url{abspath = AbsPath,
host = Host,
port = Port,
path = RelPath} = Url,
Headers, Method, Body, Options, Timeout,
#state{status = Status} = State) ->
ReqId = make_req_id(),
Resp_format = get_value(response_format, Options, list),
{StreamTo, Caller_controls_socket} =
case get_value(stream_to, Options, undefined) of
{Caller, once} when is_pid(Caller) or
is_atom(Caller) ->
Async_pid_rec = {{req_id_pid, ReqId}, self()},
true = ets:insert(ibrowse_stream, Async_pid_rec),
{Caller, true};
undefined ->
{undefined, false};
Caller when is_pid(Caller) or
is_atom(Caller) ->
{Caller, false};
Stream_to_inv ->
exit({invalid_option, {stream_to, Stream_to_inv}})
end,
SaveResponseToFile = get_value(save_response_to_file, Options, false),
NewReq = #request{url = Url,
method = Method,
stream_to = StreamTo,
caller_controls_socket = Caller_controls_socket,
options = Options,
req_id = ReqId,
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
from = From},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = add_auth_headers(Url, Options, Headers, State),
HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
false ->
@ -598,14 +558,45 @@ send_req_1(#url{abspath = AbsPath,
"--- Request End ---~n", [NReq]);
_ -> ok
end,
SndRes = case do_send(Sock, Req, State#state.is_ssl) of
ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
Err ->
io:format("Err: ~p~n", [Err]),
Err
end,
do_setopts(Sock, [{active, once}], State#state.is_ssl),
SndRes.
case do_send(Req, State) of
ok ->
case do_send_body(Body_1, State) of
ok ->
State_2 = inc_pipeline_counter(State_1),
active_once(State_1),
Ref = case Timeout of
infinity ->
undefined;
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
State_3 = case Status of
idle ->
State_2#state{status = get_header,
cur_req = NewReq,
send_timer = Ref};
_ ->
State_2#state{send_timer = Ref}
end,
case StreamTo of
undefined ->
ok;
_ ->
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
{noreply, State_3, get_inac_timeout(State_3)};
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
gen_server:reply(From, {error, send_failed}),
{stop, normal, State_1}
end.
add_auth_headers(#url{username = User,
password = UPw},
@ -719,9 +710,9 @@ encode_headers(L) ->
encode_headers(L, []).
encode_headers([{http_vsn, _Val} | T], Acc) ->
encode_headers(T, Acc);
encode_headers([{Name,Val} | T], Acc) when list(Name) ->
encode_headers([{Name,Val} | T], Acc) when is_list(Name) ->
encode_headers(T, [[Name, ": ", fmt_val(Val), crnl()] | Acc]);
encode_headers([{Name,Val} | T], Acc) when atom(Name) ->
encode_headers([{Name,Val} | T], Acc) when is_atom(Name) ->
encode_headers(T, [[atom_to_list(Name), ": ", fmt_val(Val), crnl()] | Acc]);
encode_headers([], Acc) ->
lists:reverse(Acc).
@ -732,25 +723,25 @@ chunk_request_body(Body, ChunkSize) ->
chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk | Acc]);
chunk_request_body(Body, ChunkSize, Acc) when binary(Body),
chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
size(Body) >= ChunkSize ->
<<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
ChunkBody, "\r\n"],
chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
chunk_request_body(Body, _ChunkSize, Acc) when binary(Body) ->
chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
BodySize = size(Body),
Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
Body, "\r\n"],
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
chunk_request_body(Body, ChunkSize, Acc) when list(Body),
chunk_request_body(Body, ChunkSize, Acc) when is_list(Body),
length(Body) >= ChunkSize ->
{ChunkBody, Rest} = split_list_at(Body, ChunkSize),
Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
ChunkBody, "\r\n"],
chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
chunk_request_body(Body, _ChunkSize, Acc) when list(Body) ->
chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
BodySize = length(Body),
Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
Body, "\r\n"],
@ -840,7 +831,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
{error, content_length_undefined};
V ->
case catch list_to_integer(V) of
V_1 when integer(V_1), V_1 >= 0 ->
V_1 when is_integer(V_1), V_1 >= 0 ->
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
do_trace("Recvd Content-Length of ~p~n", [V_1]),
State_2 = State_1#state{rep_buf_size=0,
@ -1058,17 +1049,20 @@ set_cur_request(#state{reqs = Reqs} = State) ->
parse_headers(Headers) ->
case scan_crlf(Headers) of
{yes, StatusLine, T} ->
Headers_1 = parse_headers_1(T),
case parse_status_line(StatusLine) of
{ok, HttpVsn, StatCode, _Msg} ->
put(http_prot_vsn, HttpVsn),
{HttpVsn, StatCode, Headers_1};
_ -> %% A HTTP 0.9 response?
put(http_prot_vsn, "HTTP/0.9"),
{"HTTP/0.9", undefined, Headers}
end;
_ ->
{error, no_status_line}
parse_headers(StatusLine, T);
{no, StatusLine} ->
parse_headers(StatusLine, <<>>)
end.
parse_headers(StatusLine, Headers) ->
Headers_1 = parse_headers_1(Headers),
case parse_status_line(StatusLine) of
{ok, HttpVsn, StatCode, _Msg} ->
put(http_prot_vsn, HttpVsn),
{HttpVsn, StatCode, Headers_1};
_ -> %% A HTTP 0.9 response?
put(http_prot_vsn, "HTTP/0.9"),
{"HTTP/0.9", undefined, Headers}
end.
% From RFC 2616
@ -1079,10 +1073,10 @@ parse_headers(Headers) ->
% SP. A recipient MAY replace any linear white space with a single
% SP before interpreting the field value or forwarding the message
% downstream.
parse_headers_1(B) when is_binary(B) ->
parse_headers_1(binary_to_list(B));
parse_headers_1(String) ->
parse_headers_1(String, [], []).
parse_headers_1(B) when is_binary(B) ->
parse_headers_1(binary_to_list(B));
parse_headers_1(String) ->
parse_headers_1(String, [], []).
parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32;
H == $\t ->
@ -1205,10 +1199,10 @@ get_crlf_pos(<<>>, _) -> no.
%% scan_crlf([H|T], L) -> scan_crlf(T, [H|L]);
%% scan_crlf([], L) -> {no, L}.
fmt_val(L) when list(L) -> L;
fmt_val(I) when integer(I) -> integer_to_list(I);
fmt_val(A) when atom(A) -> atom_to_list(A);
fmt_val(Term) -> io_lib:format("~p", [Term]).
fmt_val(L) when is_list(L) -> L;
fmt_val(I) when is_integer(I) -> integer_to_list(I);
fmt_val(A) when is_atom(A) -> atom_to_list(A);
fmt_val(Term) -> io_lib:format("~p", [Term]).
crnl() -> "\r\n".
@ -1306,7 +1300,8 @@ do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) -
do_reply(State, From, undefined, _, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
do_reply(#state{prev_req_id = Prev_req_id} = State,
_From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
State_1 = dec_pipeline_counter(State),
case Body of
[] ->
@ -1316,7 +1311,18 @@ do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
end,
catch StreamTo ! {ibrowse_async_response_end, ReqId},
State_1;
%% We don't want to delete the Req-id to Pid mapping straightaway
%% as the client may send a stream_next message just while we are
%% sending back this ibrowse_async_response_end message. If we
%% deleted this mapping straightaway, the caller will see a
%% {error, unknown_req_id} when it calls ibrowse:stream_next/1. To
%% get around this, we store the req id, and clear it after the
%% next request. If there are wierd combinations of stream,
%% stream_once and sync requests on the same connection, it will
%% take a while for the req_id-pid mapping to get cleared, but it
%% should do no harm.
ets:delete(ibrowse_stream, {req_id_pid, Prev_req_id}),
State_1#state{prev_req_id = ReqId};
do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
State_1 = dec_pipeline_counter(State),
Msg_1 = format_response_data(Resp_format, Msg),
@ -1333,6 +1339,7 @@ do_error_reply(#state{reqs = Reqs} = State, Err) ->
ReqList = queue:to_list(Reqs),
lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format}) ->
ets:delete(ibrowse_stream, {req_id_pid, ReqId}),
do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
end, ReqList).

+ 7
- 1
src/ibrowse_lb.erl 查看文件

@ -151,7 +151,13 @@ handle_info({'EXIT', Pid, _Reason},
ets:match_delete(Tid, {{'_', Pid}, '_'}),
{noreply, State#state{num_cur_sessions = Cur - 1}};
handle_info({trace, Bool}, State) ->
handle_info({trace, Bool}, #state{ets_tid = Tid} = State) ->
ets:foldl(fun({{_, Pid}, _}, Acc) when is_pid(Pid) ->
catch Pid ! {trace, Bool},
Acc;
(_, Acc) ->
Acc
end, undefined, Tid),
put(my_trace_flag, Bool),
{noreply, State};

+ 6
- 6
src/ibrowse_lib.erl 查看文件

@ -49,7 +49,7 @@ drv_ue(Str, Port) ->
%% @spec url_encode(Str) -> UrlEncodedStr
%% Str = string()
%% UrlEncodedStr = string()
url_encode(Str) when list(Str) ->
url_encode(Str) when is_list(Str) ->
url_encode_char(lists:reverse(Str), []).
url_encode_char([X | T], Acc) when X >= $0, X =< $9 ->
@ -70,7 +70,7 @@ url_encode_char([], Acc) ->
d2h(N) when N<10 -> N+$0;
d2h(N) -> N+$a-10.
decode_rfc822_date(String) when list(String) ->
decode_rfc822_date(String) when is_list(String) ->
case catch decode_rfc822_date_1(string:tokens(String, ", \t\r\n")) of
{'EXIT', _} ->
{error, invalid_date};
@ -177,9 +177,9 @@ dec2hex(M,N,Ack) -> dec2hex(M-1,N bsr 4,[d2h(N band 15)|Ack]).
%% @spec encode_base64(In) -> Out
%% In = string() | binary()
%% Out = string() | binary()
encode_base64(List) when list(List) ->
encode_base64(List) when is_list(List) ->
encode_base64_1(list_to_binary(List));
encode_base64(Bin) when binary(Bin) ->
encode_base64(Bin) when is_binary(Bin) ->
List = encode_base64_1(Bin),
list_to_binary(List).
@ -197,9 +197,9 @@ encode_base64_1(<<>>) ->
%% @spec decode_base64(In) -> Out | exit({error, invalid_input})
%% In = string() | binary()
%% Out = string() | binary()
decode_base64(List) when list(List) ->
decode_base64(List) when is_list(List) ->
decode_base64_1(List, []);
decode_base64(Bin) when binary(Bin) ->
decode_base64(Bin) when is_binary(Bin) ->
List = decode_base64_1(binary_to_list(Bin), []),
list_to_binary(List).

+ 44
- 2
src/ibrowse_test.erl 查看文件

@ -18,9 +18,50 @@
ue_test/1,
verify_chunked_streaming/0,
verify_chunked_streaming/1,
i_do_async_req_list/4
i_do_async_req_list/4,
test_stream_once/3,
test_stream_once/4
]).
test_stream_once(Url, Method, Options) ->
test_stream_once(Url, Method, Options, 5000).
test_stream_once(Url, Method, Options, Timeout) ->
case ibrowse:send_req(Url, [], Method, [], [{stream_to, {self(), once}} | Options], Timeout) of
{ibrowse_req_id, Req_id} ->
case ibrowse:stream_next(Req_id) of
ok ->
test_stream_once(Req_id);
Err ->
Err
end;
Err ->
Err
end.
test_stream_once(Req_id) ->
receive
{ibrowse_async_headers, Req_id, StatCode, Headers} ->
io:format("Recvd headers~n~p~n", [{ibrowse_async_headers, Req_id, StatCode, Headers}]),
case ibrowse:stream_next(Req_id) of
ok ->
test_stream_once(Req_id);
Err ->
Err
end;
{ibrowse_async_response, Req_id, {error, Err}} ->
io:format("Recvd error: ~p~n", [Err]);
{ibrowse_async_response, Req_id, Body_1} ->
io:format("Recvd body part: ~n~p~n", [{ibrowse_async_response, Req_id, Body_1}]),
case ibrowse:stream_next(Req_id) of
ok ->
test_stream_once(Req_id);
Err ->
Err
end;
{ibrowse_async_response_end, Req_id} ->
ok
end.
%% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to
%% tweak settings before running the load test. The defaults are 10 and 10.
load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url),
@ -182,7 +223,8 @@ unit_tests() ->
unit_tests([]).
unit_tests(Options) ->
{Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options]),
Options_1 = Options ++ [{connect_timeout, 5000}],
{Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
receive
{done, Pid} ->
ok;

+ 1
- 1
vsn.mk 查看文件

@ -1,2 +1,2 @@
IBROWSE_VSN = 1.5.0
IBROWSE_VSN = 1.5.1

正在加载...
取消
保存