瀏覽代碼

Lots of new features

pull/16/head
chandrusf 17 年之前
父節點
當前提交
f75237940a
共有 13 個檔案被更改,包括 1223 行新增743 行删除
  1. +22
    -2
      README
  2. +126
    -51
      doc/ibrowse.html
  3. +2
    -1
      ebin/ibrowse.app
  4. +1
    -0
      src/Emakefile.src
  5. +1
    -0
      src/Makefile
  6. +2
    -1
      src/ibrowse.app.src
  7. +315
    -361
      src/ibrowse.erl
  8. +6
    -0
      src/ibrowse.hrl
  9. +246
    -290
      src/ibrowse_http_client.erl
  10. +195
    -0
      src/ibrowse_lb.erl
  11. +163
    -4
      src/ibrowse_lib.erl
  12. +143
    -32
      src/ibrowse_test.erl
  13. +1
    -1
      vsn.mk

+ 22
- 2
README 查看文件

@ -1,4 +1,4 @@
$Id: README,v 1.14 2008/02/27 23:39:22 chandrusf Exp $
$Id: README,v 1.15 2008/03/27 01:35:49 chandrusf Exp $
ibrowse is a HTTP client. The following are a list of features.
- RFC2616 compliant (AFAIK)
@ -7,7 +7,7 @@ ibrowse is a HTTP client. The following are a list of features.
- Understands HTTP/0.9, HTTP/1.0 and HTTP/1.1
- Understands chunked encoding
- Can generate requests using Chunked Transfer-Encoding
- Named pools of connections to each webserver
- Pools of connections to each webserver
- Pipelining support
- Download to file
- Asynchronous requests. Responses are streamed to a process
@ -22,6 +22,26 @@ Comments to : Chandrashekhar.Mullaparthi@t-mobile.co.uk
CONTRIBUTIONS & CHANGE HISTORY
==============================
27-03-2008 - * Major rewrite of the load balancing feature. Additional module,
ibrowse_lb.erl, introduced to achieve this.
* Can now get a handle to a connection process which is not part of
the load balancing pool. Useful when an application is making
requests to a webserver which are time consuming (such as
uploading a large file). Such requests can be put on a separate
connection, and all other smaller/quicker requests can use the
load balancing pool. See ibrowse:spawn_worker_process/2 and
ibrowse:spawn_link_worker_process/2
* Ram Krishnan sent a patch to enable a client to send a lot of
data in a request by providing a fun which is invoked by the
connection handling process. This fun can fetch the data from
any where. This is useful when trying to upload a large file
to a webserver.
* Use the TCP_NODELAY option on every socket by default
* Rudimentary support for load testing of ibrowse. Undocumented,
but see ibrowse_test:load_test/3. Use the source, Luke!
* New function ibrowse:show_dest_status/2 to view state of
connections/pipelines to a web server
20-02-2008 - Ram Krishnan sent another patch for another hidden bug in the
save_response_to_file feature.

+ 126
- 51
doc/ibrowse.html 查看文件

@ -10,15 +10,15 @@
<h1>Module ibrowse</h1>
<ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client.
<p>Copyright © 2005-2007 Chandrashekhar Mullaparthi</p>
<p>Copyright © 2005-2008 Chandrashekhar Mullaparthi</p>
<p><b>Version:</b> 1.2.7</p>
<p><b>Version:</b> 1.4</p>
<p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p>
<p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p>
<h2><a name="description">Description</a></h2><p>The ibrowse application implements an HTTP 1.1 client. This
module implements the API of the HTTP client. There is one named
process called 'ibrowse' which acts as a load balancer. There is
process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
one process to handle one TCP connection to a webserver
(implemented in the module ibrowse_http_client). Multiple connections to a
webserver are setup based on the settings for each webserver. The
@ -47,10 +47,6 @@ send_req/4, send_req/5, send_req/6.

{save_response_to_file, true}], 1000).
<br><br>
ibrowse:set_dest("www.hotmail.com", 80, [{max_sessions, 10},
{max_pipeline_size, 1}]).
<br><br>
ibrowse:send_req("http://www.erlang.org", [], head).
<br><br>
@ -69,21 +65,41 @@ send_req/4, send_req/5, send_req/6.

driver isn't actually used.</p>
<h2><a name="index">Function Index</a></h2>
<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#code_change-3">code_change/3</a></td><td></td></tr>
<tr><td valign="top"><a href="#finished_async_request-0">finished_async_request/0</a></td><td>Internal export.</td></tr>
<tr><td valign="top"><a href="#get_config_value-1">get_config_value/1</a></td><td>Internal export.</td></tr>
<tr><td valign="top"><a href="#get_config_value-2">get_config_value/2</a></td><td>Internal export.</td></tr>
<tr><td valign="top"><a href="#handle_call-3">handle_call/3</a></td><td></td></tr>
<tr><td valign="top"><a href="#handle_cast-2">handle_cast/2</a></td><td></td></tr>
<tr><td valign="top"><a href="#handle_info-2">handle_info/2</a></td><td></td></tr>
<tr><td valign="top"><a href="#init-1">init/1</a></td><td></td></tr>
<tr><td valign="top"><a href="#reply-2">reply/2</a></td><td>Internal export.</td></tr>
<tr><td valign="top"><a href="#rescan_config-0">rescan_config/0</a></td><td>Clear current configuration for ibrowse and load from the file
ibrowse.conf in the IBROWSE_EBIN/../priv directory.</td></tr>
<tr><td valign="top"><a href="#rescan_config-1">rescan_config/1</a></td><td></td></tr>
<tr><td valign="top"><a href="#send_req-3">send_req/3</a></td><td>This is the basic function to send a HTTP request.</td></tr>
<tr><td valign="top"><a href="#send_req-4">send_req/4</a></td><td>Same as send_req/3.</td></tr>
<tr><td valign="top"><a href="#send_req-5">send_req/5</a></td><td>Same as send_req/4.</td></tr>
<tr><td valign="top"><a href="#send_req-6">send_req/6</a></td><td>Same as send_req/5.</td></tr>
<tr><td valign="top"><a href="#set_dest-3">set_dest/3</a></td><td>Sets options for a destination.</td></tr>
<tr><td valign="top"><a href="#shutting_down-0">shutting_down/0</a></td><td>Internal export.</td></tr>
<tr><td valign="top"><a href="#start-0">start/0</a></td><td></td></tr>
<tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td></td></tr>
<tr><td valign="top"><a href="#stop-0">stop/0</a></td><td></td></tr>
<tr><td valign="top"><a href="#send_req_direct-4">send_req_direct/4</a></td><td>Same as send_req/3 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2.</td></tr>
<tr><td valign="top"><a href="#send_req_direct-5">send_req_direct/5</a></td><td>Same as send_req/4 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2.</td></tr>
<tr><td valign="top"><a href="#send_req_direct-6">send_req_direct/6</a></td><td>Same as send_req/5 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2.</td></tr>
<tr><td valign="top"><a href="#send_req_direct-7">send_req_direct/7</a></td><td>Same as send_req/6 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2.</td></tr>
<tr><td valign="top"><a href="#set_dest-3">set_dest/3</a></td><td>Deprecated.</td></tr>
<tr><td valign="top"><a href="#set_max_pipeline_size-3">set_max_pipeline_size/3</a></td><td>Set the maximum pipeline size for each connection to a specific Host:Port.</td></tr>
<tr><td valign="top"><a href="#set_max_sessions-3">set_max_sessions/3</a></td><td>Set the maximum number of connections allowed to a specific Host:Port.</td></tr>
<tr><td valign="top"><a href="#show_dest_status-2">show_dest_status/2</a></td><td>Shows some internal information about load balancing to a
specified Host:Port.</td></tr>
<tr><td valign="top"><a href="#spawn_link_worker_process-2">spawn_link_worker_process/2</a></td><td>Same as spawn_worker_process/2 except the the calling process
is linked to the worker process which is spawned.</td></tr>
<tr><td valign="top"><a href="#spawn_worker_process-2">spawn_worker_process/2</a></td><td>Creates a HTTP client process to the specified Host:Port which
is not part of the load balancing pool.</td></tr>
<tr><td valign="top"><a href="#start-0">start/0</a></td><td>Starts the ibrowse process without linking.</td></tr>
<tr><td valign="top"><a href="#start_link-0">start_link/0</a></td><td>Starts the ibrowse process linked to the calling process.</td></tr>
<tr><td valign="top"><a href="#stop-0">stop/0</a></td><td>Stop the ibrowse process.</td></tr>
<tr><td valign="top"><a href="#stop_worker_process-1">stop_worker_process/1</a></td><td>Terminate a worker process spawned using
spawn_worker_process/2 or spawn_link_worker_process/2.</td></tr>
<tr><td valign="top"><a href="#terminate-2">terminate/2</a></td><td></td></tr>
<tr><td valign="top"><a href="#trace_off-0">trace_off/0</a></td><td>Turn tracing off for the ibrowse process.</td></tr>
<tr><td valign="top"><a href="#trace_off-2">trace_off/2</a></td><td>Turn tracing OFF for all connections to the specified HTTP
@ -100,12 +116,15 @@ send_req/4, send_req/5, send_req/6.

<p><tt>code_change() -&gt; term()</tt></p>
</div>
<h3 class="function"><a name="finished_async_request-0">finished_async_request/0</a></h3>
<h3 class="function"><a name="get_config_value-1">get_config_value/1</a></h3>
<div class="spec">
<p><tt>get_config_value() -&gt; term()</tt></p>
</div><p>Internal export</p>
<h3 class="function"><a name="get_config_value-2">get_config_value/2</a></h3>
<div class="spec">
<p><tt>finished_async_request() -&gt; term()</tt></p>
</div><p>Internal export. Called by a HTTP connection process to
indicate to the load balancing process (ibrowse) that an
asynchronous request has finished processing.</p>
<p><tt>get_config_value() -&gt; term()</tt></p>
</div><p>Internal export</p>
<h3 class="function"><a name="handle_call-3">handle_call/3</a></h3>
<div class="spec">
@ -127,12 +146,18 @@ send_req/4, send_req/5, send_req/6.

<p><tt>init() -&gt; term()</tt></p>
</div>
<h3 class="function"><a name="reply-2">reply/2</a></h3>
<h3 class="function"><a name="rescan_config-0">rescan_config/0</a></h3>
<div class="spec">
<p><tt>reply() -&gt; term()</tt></p>
</div><p>Internal export. Called by a HTTP connection process to
indicate to the load balancing process (ibrowse) that a synchronous
request has finished processing.</p>
<p><tt>rescan_config() -&gt; term()</tt></p>
</div><p>Clear current configuration for ibrowse and load from the file
ibrowse.conf in the IBROWSE_EBIN/../priv directory. Current
configuration is cleared only if the ibrowse.conf file is readable
using file:consult/1</p>
<h3 class="function"><a name="rescan_config-1">rescan_config/1</a></h3>
<div class="spec">
<p><tt>rescan_config() -&gt; term()</tt></p>
</div>
<h3 class="function"><a name="send_req-3">send_req/3</a></h3>
<div class="spec">
@ -205,47 +230,97 @@ send_req/4, send_req/5, send_req/6.

</div><p>Same as send_req/5.
All timeout values are in milliseconds.</p>
<h3 class="function"><a name="send_req_direct-4">send_req_direct/4</a></h3>
<div class="spec">
<p><tt>send_req_direct() -&gt; term()</tt></p>
</div><p>Same as send_req/3 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2</p>
<h3 class="function"><a name="send_req_direct-5">send_req_direct/5</a></h3>
<div class="spec">
<p><tt>send_req_direct() -&gt; term()</tt></p>
</div><p>Same as send_req/4 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2</p>
<h3 class="function"><a name="send_req_direct-6">send_req_direct/6</a></h3>
<div class="spec">
<p><tt>send_req_direct() -&gt; term()</tt></p>
</div><p>Same as send_req/5 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2</p>
<h3 class="function"><a name="send_req_direct-7">send_req_direct/7</a></h3>
<div class="spec">
<p><tt>send_req_direct() -&gt; term()</tt></p>
</div><p>Same as send_req/6 except that the first argument is the PID
returned by spawn_worker_process/2 or spawn_link_worker_process/2</p>
<h3 class="function"><a name="set_dest-3">set_dest/3</a></h3>
<div class="spec">
<p><tt>set_dest(Host::string(), Port::integer(), Opts::<a href="#type-opt_list">opt_list()</a>) -&gt; ok</tt>
<ul class="definitions"><li><tt><a name="type-opt_list">opt_list()</a> = [opt]</tt></li>
<li><tt><a name="type-opt">opt()</a> = {max_sessions, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>}</tt></li>
</ul></p>
</div><p>Sets options for a destination. If the options have not been
set in the ibrowse.conf file, it can be set using this function
before sending the first request to the destination. If not,
defaults will be used. Entries in ibrowse.conf look like this.
<code><br>
{dest, Host, Port, MaxSess, MaxPipe, Options}.<br>
where <br>
Host = string(). "www.erlang.org" | "193.180.168.23"<br>
Port = integer()<br>
MaxSess = integer()<br>
MaxPipe = integer()<br>
Options = optionList() -- see options in send_req/5<br>
</code></p>
<p><tt>set_dest() -&gt; term()</tt></p>
</div><p>Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
for achieving the same effect.</p>
<h3 class="function"><a name="set_max_pipeline_size-3">set_max_pipeline_size/3</a></h3>
<div class="spec">
<p><tt>set_max_pipeline_size(Host::string(), Port::integer(), Max::integer()) -&gt; ok</tt></p>
</div><p>Set the maximum pipeline size for each connection to a specific Host:Port.</p>
<h3 class="function"><a name="set_max_sessions-3">set_max_sessions/3</a></h3>
<div class="spec">
<p><tt>set_max_sessions(Host::string(), Port::integer(), Max::integer()) -&gt; ok</tt></p>
</div><p>Set the maximum number of connections allowed to a specific Host:Port.</p>
<h3 class="function"><a name="show_dest_status-2">show_dest_status/2</a></h3>
<div class="spec">
<p><tt>show_dest_status() -&gt; term()</tt></p>
</div><p>Shows some internal information about load balancing to a
specified Host:Port. Info about workers spawned using
spawn_worker_process/2 or spawn_link_worker_process/2 is not
included.</p>
<h3 class="function"><a name="shutting_down-0">shutting_down/0</a></h3>
<h3 class="function"><a name="spawn_link_worker_process-2">spawn_link_worker_process/2</a></h3>
<div class="spec">
<p><tt>shutting_down() -&gt; term()</tt></p>
</div><p>Internal export. Called by a HTTP connection process to
indicate to ibrowse that it is shutting down and further requests
should not be sent it's way.</p>
<p><tt>spawn_link_worker_process() -&gt; term()</tt></p>
</div><p>Same as spawn_worker_process/2 except the the calling process
is linked to the worker process which is spawned.</p>
<h3 class="function"><a name="spawn_worker_process-2">spawn_worker_process/2</a></h3>
<div class="spec">
<p><tt>spawn_worker_process(Host::string(), Port::integer()) -&gt; {ok, pid()}</tt></p>
</div><p>Creates a HTTP client process to the specified Host:Port which
is not part of the load balancing pool. This is useful in cases
where some requests to a webserver might take a long time whereas
some might take a very short time. To avoid getting these quick
requests stuck in the pipeline behind time consuming requests, use
this function to get a handle to a connection process. <br>
<b>Note:</b> Calling this function only creates a worker process. No connection
is setup. The connection attempt is made only when the first
request is sent via any of the send_req_direct/4,5,6,7 functions.<br>
<b>Note:</b> It is the responsibility of the calling process to control
pipeline size on such connections.
</p>
<h3 class="function"><a name="start-0">start/0</a></h3>
<div class="spec">
<p><tt>start() -&gt; term()</tt></p>
</div>
</div><p>Starts the ibrowse process without linking. Useful when testing using the shell</p>
<h3 class="function"><a name="start_link-0">start_link/0</a></h3>
<div class="spec">
<p><tt>start_link() -&gt; term()</tt></p>
</div>
<p><tt>start_link() -&gt; {ok, pid()}</tt></p>
</div><p>Starts the ibrowse process linked to the calling process. Usually invoked by the supervisor ibrowse_sup</p>
<h3 class="function"><a name="stop-0">stop/0</a></h3>
<div class="spec">
<p><tt>stop() -&gt; term()</tt></p>
</div>
</div><p>Stop the ibrowse process. Useful when testing using the shell.</p>
<h3 class="function"><a name="stop_worker_process-1">stop_worker_process/1</a></h3>
<div class="spec">
<p><tt>stop_worker_process(Conn_pid::pid()) -&gt; ok</tt></p>
</div><p>Terminate a worker process spawned using
spawn_worker_process/2 or spawn_link_worker_process/2. Requests in
progress will get the error response <pre>{error, closing_on_request}</pre></p>
<h3 class="function"><a name="terminate-2">terminate/2</a></h3>
<div class="spec">
@ -279,6 +354,6 @@ send_req/4, send_req/5, send_req/6.

<hr>
<div class="navbar"><a name="#navbar_bottom"></a><table width="100%" border="0" cellspacing="0" cellpadding="2" summary="navigation bar"><tr><td><a href="overview-summary.html" target="overviewFrame">Overview</a></td><td><a href="http://www.erlang.org/"><img src="erlang.png" align="right" border="0" alt="erlang logo"></a></td></tr></table></div>
<p><i>Generated by EDoc, Feb 7 2008, 11:49:30.</i></p>
<p><i>Generated by EDoc, Mar 27 2008, 01:03:49.</i></p>
</body>
</html>

+ 2
- 1
ebin/ibrowse.app 查看文件

@ -1,10 +1,11 @@
{application, ibrowse,
[{description, "HTTP client application"},
{vsn, "1.2.5"},
{vsn, "1.4"},
{modules, [ ibrowse,
ibrowse_http_client,
ibrowse_app,
ibrowse_sup,
ibrowse_lb,
ibrowse_lib ]},
{registered, []},
{applications, [kernel,stdlib,sasl]},

+ 1
- 0
src/Emakefile.src 查看文件

@ -3,4 +3,5 @@
'../src/ibrowse_app'.
'../src/ibrowse_sup'.
'../src/ibrowse_lib'.
'../src/ibrowse_lb'.
'../src/ibrowse_test'.

+ 1
- 0
src/Makefile 查看文件

@ -3,6 +3,7 @@ ERL_FILES = ibrowse.erl \
ibrowse_app.erl \
ibrowse_sup.erl \
ibrowse_lib.erl \
ibrowse_lb.erl \
ibrowse_test.erl

+ 2
- 1
src/ibrowse.app.src 查看文件

@ -5,7 +5,8 @@
ibrowse_http_client,
ibrowse_app,
ibrowse_sup,
ibrowse_lib ]},
ibrowse_lib,
ibrowse_lb ]},
{registered, []},
{applications, [kernel,stdlib,sasl]},
{env, []},

+ 315
- 361
src/ibrowse.erl 查看文件

@ -6,11 +6,11 @@
%%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2007 Chandrashekhar Mullaparthi
%% @version 1.2.7
%% @copyright 2005-2008 Chandrashekhar Mullaparthi
%% @version 1.4
%% @doc The ibrowse application implements an HTTP 1.1 client. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which acts as a load balancer. There is
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
%% one process to handle one TCP connection to a webserver
%% (implemented in the module ibrowse_http_client). Multiple connections to a
%% webserver are setup based on the settings for each webserver. The
@ -39,10 +39,6 @@
%% {save_response_to_file, true}], 1000).
%% <br/><br/>
%%
%% ibrowse:set_dest("www.hotmail.com", 80, [{max_sessions, 10},
%% {max_pipeline_size, 1}]).
%% <br/><br/>
%%
%% ibrowse:send_req("http://www.erlang.org", [], head).
%%
%% <br/><br/>
@ -61,7 +57,7 @@
%% driver isn't actually used.</p>
-module(ibrowse).
-vsn('$Id: ibrowse.erl,v 1.5 2008/02/07 12:02:13 chandrusf Exp $ ').
-vsn('$Id: ibrowse.erl,v 1.6 2008/03/27 01:35:50 chandrusf Exp $ ').
-behaviour(gen_server).
%%--------------------------------------------------------------------
@ -77,46 +73,51 @@
terminate/2, code_change/3]).
%% API interface
-export([send_req/3,
-export([
rescan_config/0,
rescan_config/1,
get_config_value/1,
get_config_value/2,
spawn_worker_process/2,
spawn_link_worker_process/2,
stop_worker_process/1,
send_req/3,
send_req/4,
send_req/5,
send_req/6,
send_req_direct/4,
send_req_direct/5,
send_req_direct/6,
send_req_direct/7,
set_max_sessions/3,
set_max_pipeline_size/3,
set_dest/3,
trace_on/0,
trace_off/0,
trace_on/2,
trace_off/2,
set_dest/3]).
%% Internal exports
-export([reply/2,
finished_async_request/0,
shutting_down/0]).
show_dest_status/2
]).
-ifdef(debug).
-compile(export_all).
-endif.
-import(ibrowse_http_client, [parse_url/1,
printable_date/0]).
-record(state, {dests=[], trace=false, port}).
-import(ibrowse_lib, [
parse_url/1,
printable_date/0,
get_value/2,
get_value/3,
do_trace/2
]).
-record(state, {trace = false}).
-include("ibrowse.hrl").
-define(DEF_MAX_SESSIONS,10).
-define(DEF_MAX_PIPELINE_SIZE,10).
%% key = {Host, Port} where Host is a string, or {Name, Host, Port}
%% where Name is an atom.
%% conns = queue()
-record(dest, {key,
conns=queue:new(),
num_sessions=0,
max_sessions=?DEF_MAX_SESSIONS,
max_pipeline_size=?DEF_MAX_PIPELINE_SIZE,
options=[],
trace=false}).
%%====================================================================
%% External functions
%%====================================================================
@ -124,36 +125,19 @@
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
%% @doc Starts the ibrowse process linked to the calling process. Usually invoked by the supervisor ibrowse_sup
%% @spec start_link() -> {ok, pid()}
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% @doc Starts the ibrowse process without linking. Useful when testing using the shell
start() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], [{debug, []}]).
%% @doc Stop the ibrowse process. Useful when testing using the shell.
stop() ->
catch gen_server:call(ibrowse, stop).
%% @doc Sets options for a destination. If the options have not been
%% set in the ibrowse.conf file, it can be set using this function
%% before sending the first request to the destination. If not,
%% defaults will be used. Entries in ibrowse.conf look like this.
%% <code><br/>
%% {dest, Host, Port, MaxSess, MaxPipe, Options}.<br/>
%% where <br/>
%% Host = string(). "www.erlang.org" | "193.180.168.23"<br/>
%% Port = integer()<br/>
%% MaxSess = integer()<br/>
%% MaxPipe = integer()<br/>
%% Options = optionList() -- see options in send_req/5<br/>
%% </code>
%% @spec set_dest(Host::string(),Port::integer(),Opts::opt_list()) -> ok
%% opt_list() = [opt]
%% opt() = {max_sessions, integer()} |
%% {max_pipeline_size, integer()} |
%% {trace, boolean()}
set_dest(Host,Port,Opts) ->
gen_server:call(?MODULE,{set_dest,Host,Port,Opts}).
%% @doc This is the basic function to send a HTTP request.
%% The Status return value indicates the HTTP status code returned by the webserver
%% @spec send_req(Url::string(), Headers::headerList(), Method::method()) -> response()
@ -173,9 +157,12 @@ send_req(Url, Headers, Method) ->
send_req(Url, Headers, Method, [], []).
%% @doc Same as send_req/3.
%% If a list is specified for the body it has to be a flat list.
%% If a list is specified for the body it has to be a flat list. The body can also be a fun/0 or a fun/1. <br/>
%% If fun/0, the connection handling process will repeatdely call the fun until it returns an error or eof. <pre>Fun() = {ok, Data} | eof</pre><br/>
%% If fun/1, the connection handling process will repeatedly call the fun with the supplied state until it returns an error or eof. <pre>Fun(State) = {ok, Data} | {ok, Data, NewState} | eof</pre>
%% @spec send_req(Url, Headers, Method::method(), Body::body()) -> response()
%% body() = [] | string() | binary()
%% body() = [] | string() | binary() | fun_arity_0() | {fun_arity_1(), initial_state()}
%% initial_state() = term()
send_req(Url, Headers, Method, Body) ->
send_req(Url, Headers, Method, Body, []).
@ -241,22 +228,143 @@ send_req(Url, Headers, Method, Body, Options) ->
%% @spec send_req(Url, Headers::headerList(), Method::method(), Body::body(), Options::optionList(), Timeout) -> response()
%% Timeout = integer() | infinity
send_req(Url, Headers, Method, Body, Options, Timeout) ->
Timeout_1 = case Timeout of
infinity ->
infinity;
_ ->
Timeout + 1000
case catch parse_url(Url) of
#url{host = Host,
port = Port} = Parsed_url ->
Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
get_lb_pid(Parsed_url);
[#lb_pid{pid = Lb_pid_1}] ->
Lb_pid_1
end,
Max_sessions = get_max_sessions(Host, Port, Options),
Max_pipeline_size = get_max_pipeline_size(Host, Port, Options),
{SSLOptions, IsSSL} =
case get_value(is_ssl, Options, false) of
false -> {[], false};
true -> {get_value(ssl_options, Options), true}
end,
case catch gen_server:call(ibrowse,
{send_req, [Url, Headers, Method,
Body, Options, Timeout]},
Timeout_1) of
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL}) of
{ok, Conn_Pid} ->
do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options, Timeout);
Err ->
Err
end;
Err ->
{error, {url_parsing_failed, Err}}
end.
get_lb_pid(Url) ->
gen_server:call(?MODULE, {get_lb_pid, Url}).
get_max_sessions(Host, Port, Options) ->
get_value(max_sessions, Options,
get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)).
get_max_pipeline_size(Host, Port, Options) ->
get_value(max_pipeline_size, Options,
get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)).
%% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
%% for achieving the same effect.
set_dest(Host, Port, [{max_sessions, Max} | T]) ->
set_max_sessions(Host, Port, Max),
set_dest(Host, Port, T);
set_dest(Host, Port, [{max_pipeline_size, Max} | T]) ->
set_max_pipeline_size(Host, Port, Max),
set_dest(Host, Port, T);
set_dest(Host, Port, [{trace, Bool} | T]) when Bool == true; Bool == false ->
ibrowse ! {trace, true, Host, Port},
set_dest(Host, Port, T);
set_dest(_Host, _Port, [H | _]) ->
exit({invalid_option, H});
set_dest(_, _, []) ->
ok.
%% @doc Set the maximum number of connections allowed to a specific Host:Port.
%% @spec set_max_sessions(Host::string(), Port::integer(), Max::integer()) -> ok
set_max_sessions(Host, Port, Max) when is_integer(Max), Max > 0 ->
gen_server:call(?MODULE, {set_config_value, {max_sessions, Host, Port}, Max}).
%% @doc Set the maximum pipeline size for each connection to a specific Host:Port.
%% @spec set_max_pipeline_size(Host::string(), Port::integer(), Max::integer()) -> ok
set_max_pipeline_size(Host, Port, Max) when is_integer(Max), Max > 0 ->
gen_server:call(?MODULE, {set_config_value, {max_pipeline_size, Host, Port}, Max}).
do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
case catch ibrowse_http_client:send_req(Conn_Pid, Parsed_url,
Headers, Method, Body,
Options, Timeout) of
{'EXIT', {timeout, _}} ->
{error, genserver_timedout};
Res ->
Res
{error, req_timedout};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
Ret ->
Ret
end.
%% @doc Creates a HTTP client process to the specified Host:Port which
%% is not part of the load balancing pool. This is useful in cases
%% where some requests to a webserver might take a long time whereas
%% some might take a very short time. To avoid getting these quick
%% requests stuck in the pipeline behind time consuming requests, use
%% this function to get a handle to a connection process. <br/>
%% <b>Note:</b> Calling this function only creates a worker process. No connection
%% is setup. The connection attempt is made only when the first
%% request is sent via any of the send_req_direct/4,5,6,7 functions.<br/>
%% <b>Note:</b> It is the responsibility of the calling process to control
%% pipeline size on such connections.
%%
%% @spec spawn_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
spawn_worker_process(Host, Port) ->
ibrowse_http_client:start({Host, Port}).
%% @doc Same as spawn_worker_process/2 except the the calling process
%% is linked to the worker process which is spawned.
spawn_link_worker_process(Host, Port) ->
ibrowse_http_client:start_link({Host, Port}).
%% @doc Terminate a worker process spawned using
%% spawn_worker_process/2 or spawn_link_worker_process/2. Requests in
%% progress will get the error response <pre>{error, closing_on_request}</pre>
%% @spec stop_worker_process(Conn_pid::pid()) -> ok
stop_worker_process(Conn_pid) ->
ibrowse_http_client:stop(Conn_pid).
%% @doc Same as send_req/3 except that the first argument is the PID
%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
send_req_direct(Conn_pid, Url, Headers, Method) ->
send_req_direct(Conn_pid, Url, Headers, Method, [], []).
%% @doc Same as send_req/4 except that the first argument is the PID
%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
send_req_direct(Conn_pid, Url, Headers, Method, Body) ->
send_req_direct(Conn_pid, Url, Headers, Method, Body, []).
%% @doc Same as send_req/5 except that the first argument is the PID
%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
send_req_direct(Conn_pid, Url, Headers, Method, Body, Options) ->
send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, 30000).
%% @doc Same as send_req/6 except that the first argument is the PID
%% returned by spawn_worker_process/2 or spawn_link_worker_process/2
send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) ->
case catch parse_url(Url) of
#url{} = Parsed_url ->
case do_send_req(Conn_pid, Parsed_url, Headers, Method, Body, Options, Timeout) of
{error, {'EXIT', {noproc, _}}} ->
{error, worker_is_dead};
Ret ->
Ret
end;
Err ->
{error, {url_parsing_failed, Err}}
end.
%% @doc Turn tracing on for the ibrowse process
trace_on() ->
ibrowse ! {trace, true}.
@ -278,23 +386,53 @@ trace_on(Host, Port) ->
trace_off(Host, Port) ->
ibrowse ! {trace, false, Host, Port}.
%% @doc Internal export. Called by a HTTP connection process to
%% indicate to the load balancing process (ibrowse) that a synchronous
%% request has finished processing.
reply(OrigCaller, Reply) ->
gen_server:call(ibrowse, {reply, OrigCaller, Reply, self()}).
%% @doc Shows some internal information about load balancing to a
%% specified Host:Port. Info about workers spawned using
%% spawn_worker_process/2 or spawn_link_worker_process/2 is not
%% included.
show_dest_status(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid}] ->
io:format("Load Balancer Pid : ~p~n", [Lb_pid]),
io:format("LB process msg q size : ~p~n", [(catch process_info(Lb_pid, message_queue_len))]),
case lists:dropwhile(
fun(Tid) ->
ets:info(Tid, owner) /= Lb_pid
end, ets:all()) of
[] ->
io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]);
[Tid | _] ->
First = ets:first(Tid),
Last = ets:last(Tid),
Size = ets:info(Tid, size),
io:format("LB ETS table id : ~p~n", [Tid]),
io:format("Num Connections : ~p~n", [Size]),
case Size of
0 ->
ok;
_ ->
{First_p_sz, _} = First,
{Last_p_sz, _} = Last,
io:format("Smallest pipeline : ~1000.p~n", [First_p_sz]),
io:format("Largest pipeline : ~1000.p~n", [Last_p_sz])
end
end
end.
%% @doc Internal export. Called by a HTTP connection process to
%% indicate to the load balancing process (ibrowse) that an
%% asynchronous request has finished processing.
finished_async_request() ->
gen_server:call(ibrowse, {finished_async_request, self()}).
%% @doc Clear current configuration for ibrowse and load from the file
%% ibrowse.conf in the IBROWSE_EBIN/../priv directory. Current
%% configuration is cleared only if the ibrowse.conf file is readable
%% using file:consult/1
rescan_config() ->
gen_server:call(?MODULE, rescan_config).
%% @doc Internal export. Called by a HTTP connection process to
%% indicate to ibrowse that it is shutting down and further requests
%% should not be sent it's way.
shutting_down() ->
gen_server:call(ibrowse, {shutting_down, self()}).
%% Clear current configuration for ibrowse and load from the specified
%% file. Current configuration is cleared only if the specified
%% file is readable using file:consult/1
rescan_config(File) when is_list(File) ->
gen_server:call(?MODULE, {rescan_config, File}).
%%====================================================================
%% Server functions
@ -312,32 +450,66 @@ init(_) ->
process_flag(trap_exit, true),
State = #state{},
put(my_trace_flag, State#state.trace),
put(ibrowse_trace_token, "ibrowse"),
ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
import_config(),
{ok, #state{}}.
import_config() ->
case code:priv_dir(ibrowse) of
{error, _} ->
{ok, #state{}};
{error, _} = Err ->
Err;
PrivDir ->
Filename = filename:join(PrivDir, "ibrowse.conf"),
case file:consult(Filename) of
{ok, Terms} ->
Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}, Acc)
when list(Host), integer(Port),
integer(MaxSess), MaxSess > 0,
integer(MaxPipe), MaxPipe > 0, list(Options) ->
Key = maybe_named_key(Host, Port, Options),
NewDest = #dest{key=Key,
options=Options,
max_sessions=MaxSess,
max_pipeline_size=MaxPipe},
[NewDest | Acc];
(_, Acc) ->
Acc
end,
{ok, #state{dests=lists:foldl(Fun, [], Terms)}};
_Else ->
{ok, #state{}}
end
import_config(Filename)
end.
import_config(Filename) ->
case file:consult(Filename) of
{ok, Terms} ->
ets:delete_all_objects(ibrowse_conf),
Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
when list(Host), integer(Port),
integer(MaxSess), MaxSess > 0,
integer(MaxPipe), MaxPipe > 0, list(Options) ->
I = [{{max_sessions, Host, Port}, MaxSess},
{{max_pipeline_size, Host, Port}, MaxPipe},
{{options, Host, Port}, Options}],
lists:foreach(
fun({X, Y}) ->
ets:insert(ibrowse_conf,
#ibrowse_conf{key = X,
value = Y})
end, I);
({K, V}) ->
ets:insert(ibrowse_conf,
#ibrowse_conf{key = K,
value = V});
(X) ->
io:format("Skipping unrecognised term: ~p~n", [X])
end,
lists:foreach(Fun, Terms);
Err ->
Err
end.
%% @doc Internal export
get_config_value(Key) ->
[#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
V.
%% @doc Internal export
get_config_value(Key, DefVal) ->
case ets:lookup(ibrowse_conf, Key) of
[] ->
DefVal;
[#ibrowse_conf{value = V}] ->
V
end.
set_config_value(Key, Val) ->
ets:insert(ibrowse_conf, #ibrowse_conf{key = Key, value = Val}).
%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
@ -348,46 +520,28 @@ init(_) ->
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_call({send_req, _}=Req, From, State) ->
State_1 = handle_send_req(Req, From, State),
{noreply, State_1};
handle_call({reply, OrigCaller, Reply, HttpClientPid}, From, State) ->
gen_server:reply(From, ok),
gen_server:reply(OrigCaller, Reply),
Key = {HttpClientPid, pending_reqs},
case get(Key) of
NumPend when integer(NumPend) ->
put(Key, NumPend - 1);
_ ->
ok
end,
{noreply, State};
handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
Pid = do_get_connection(Url, ets:lookup(ibrowse_lb, {Host, Port})),
{reply, Pid, State};
handle_call({finished_async_request, HttpClientPid}, From, State) ->
gen_server:reply(From, ok),
Key = {HttpClientPid, pending_reqs},
case get(Key) of
NumPend when integer(NumPend) ->
put(Key, NumPend - 1);
_ ->
ok
end,
{noreply, State};
handle_call(stop, _From, State) ->
do_trace("IBROWSE shutting down~n", []),
{stop, normal, ok, State};
handle_call({shutting_down, Pid}, _From, State) ->
State_1 = handle_conn_closing(Pid, State),
{reply, ok, State_1};
handle_call({set_dest,Host,Port,Opts}, _From, State) ->
State2 = set_destI(State,Host,Port,Opts),
{reply, ok, State2};
handle_call({set_config_value, Key, Val}, _From, State) ->
set_config_value(Key, Val),
{reply, ok, State};
handle_call(stop, _From, State) ->
{stop, shutting_down, ok, State};
handle_call(rescan_config, _From, State) ->
Ret = (catch import_config()),
{reply, Ret, State};
handle_call(_Request, _From, State) ->
Reply = ok,
handle_call({rescan_config, File}, _From, State) ->
Ret = (catch import_config(File)),
{reply, Ret, State};
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
%%--------------------------------------------------------------------
@ -408,53 +562,27 @@ handle_cast(_Msg, State) ->
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
%% A bit of a bodge here...ideally, would be good to store connection state
%% in the queue itself against each Pid.
handle_info({done_req, Pid}, State) ->
Key = {Pid, pending_reqs},
case get(Key) of
NumPend when integer(NumPend) ->
put(Key, NumPend - 1);
_ ->
ok
end,
do_trace("~p has finished a request~n", [Pid]),
{noreply, State};
handle_info({'EXIT', _, normal}, State) ->
{noreply, State};
handle_info({'EXIT', Pid, _Reason}, State) ->
%% TODO: We have to reply to all the pending requests
State_1 = handle_conn_closing(Pid, State),
do_trace("~p has exited~n", [Pid]),
{noreply, State_1};
handle_info({shutting_down, Pid}, State) ->
State_1 = handle_conn_closing(Pid, State),
{noreply, State_1};
handle_info({conn_closing, Pid, OriReq, From}, State) ->
State_1 = handle_conn_closing(Pid, State),
State_2 = handle_send_req(OriReq, From, State_1),
{noreply, State_2};
handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State#state{trace=Bool}};
handle_info({trace, Bool, Host, Port}, #state{dests=Dests}=State) ->
case lists:keysearch({Host, Port}, #dest.key, Dests) of
{value, Dest} ->
lists:foreach(fun(ConnPid) ->
ConnPid ! {trace, Bool}
end, queue:to_list(Dest#dest.conns)),
{noreply, State#state{dests=lists:keyreplace({Host,Port}, #dest.key, Dests, Dest#dest{trace=Bool})}};
false ->
do_trace("Not found any state information for specified Host, Port.~n", []),
{noreply, State}
end;
{noreply, State};
handle_info({trace, Bool, Host, Port}, State) ->
Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _)
when H == Host,
P == Port ->
catch Pid ! {trace, Bool};
(#client_conn{key = {H, P, Pid}}, _)
when H == Host,
P == Port ->
catch Pid ! {trace, Bool};
(_, Acc) ->
Acc
end,
ets:foldl(Fun, undefined, ibrowse_lb),
ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port},
value = Bool}),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@ -477,183 +605,9 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
handle_send_req({send_req, [Url, _Headers, _Method, _Body, Options, _Timeout]}=Req,
From, State) ->
case get_host_port(Url, Options) of
{Host, Port, _RelPath} ->
Key = maybe_named_key(Host, Port, Options),
case lists:keysearch(Key, #dest.key, State#state.dests) of
false ->
{ok, Pid} = spawn_new_connection(Key, false, Options),
Pid ! {Req, From},
Q = queue:new(),
Q_1 = queue:in(Pid, Q),
NewDest = #dest{key=Key,conns=Q_1,num_sessions=1}, %% MISSING is_ssl
State#state{dests=[NewDest|State#state.dests]};
{value, #dest{conns=Conns,
num_sessions=NumS,
max_pipeline_size=MaxPSz,
max_sessions=MaxS}=Dest} ->
case get_free_worker(Conns, NumS, MaxS, MaxPSz) of
spawn_new_connection ->
do_trace("Spawning new connection~n", []),
{ok, Pid} = spawn_new_connection(Key, Dest#dest.trace, Dest#dest.options),
Pid ! {Req, From},
Q_1 = queue:in(Pid, Conns),
Dest_1 = Dest#dest{conns=Q_1, num_sessions=NumS+1},
State#state{dests=lists:keyreplace(Key, #dest.key, State#state.dests, Dest_1)};
not_found ->
do_trace("State -> ~p~nPDict -> ~p~n", [State, get()]),
gen_server:reply(From, {error, retry_later}),
State;
{ok, Pid, _, ConnPids} ->
do_trace("Reusing existing pid: ~p~n", [Pid]),
Pid_key = {Pid, pending_reqs},
put(Pid_key, get(Pid_key) + 1),
Pid ! {Req, From},
State#state{dests=lists:keyreplace(Key, #dest.key, State#state.dests,Dest#dest{conns=ConnPids})}
end
end;
invalid_uri ->
gen_server:reply(From, {error, invalid_uri}),
State
end.
get_host_port(Url, Options) ->
case get_value(proxy_host, Options, false) of
false ->
case parse_url(Url) of
#url{host=H, port=P, path=Path} ->
{H, P, Path};
_Err ->
invalid_uri
end;
PxyHost ->
PxyPort = get_value(proxy_port, Options, 80),
{PxyHost, PxyPort, Url}
end.
handle_conn_closing(Pid, #state{dests=Dests}=State) ->
erase({Pid, pending_reqs}),
HostKey = get({Pid, hostport}),
erase({Pid, hostport}),
do_trace("~p is shutting down~n", [Pid]),
case lists:keysearch(HostKey, #dest.key, Dests) of
{value, #dest{num_sessions=Num, conns=Q}=Dest} ->
State#state{dests=lists:keyreplace(HostKey, #dest.key, Dests,
Dest#dest{conns=del_from_q(Q, Num, Pid), num_sessions=Num-1})};
false ->
State
end.
%% Replaces destination information if found, otherwise appends it.
%% Copies over Connection Queue and Number of sessions.
set_destI(State,Host,Port,Opts) ->
#state{dests=DestList} = State,
Key = maybe_named_key(Host, Port, Opts),
NewDests = case lists:keysearch(Key, #dest.key, DestList) of
false ->
Dest = insert_opts(Opts,#dest{key=Key}),
[Dest | DestList];
{value, OldDest} ->
OldDest_1 = insert_opts(Opts, OldDest),
[OldDest_1 | (DestList -- [OldDest])]
end,
State#state{dests=NewDests}.
insert_opts(Opts, Dest) ->
insert_opts_1(Opts, Dest#dest{options=Opts}).
insert_opts_1([],Dest) -> Dest;
insert_opts_1([{max_sessions,Msess}|T],Dest) ->
insert_opts_1(T,Dest#dest{max_sessions=Msess});
insert_opts_1([{max_pipeline_size,Mpls}|T],Dest) ->
insert_opts_1(T,Dest#dest{max_pipeline_size=Mpls});
insert_opts_1([{trace,Bool}|T],Dest) when Bool==true; Bool==false ->
insert_opts_1(T,Dest#dest{trace=Bool});
insert_opts_1([_|T],Dest) -> %% ignores other
insert_opts_1(T,Dest).
% Picks out the worker with the minimum pipeline size
% If a worker is found with a non-zero pipeline size, but the number of sessins
% is less than the max allowed sessions, a new connection is spawned.
get_free_worker(Q, NumSessions, MaxSessions, MaxPSz) ->
case get_free_worker_1(Q, NumSessions, MaxPSz, {undefined, undefined}) of
not_found when NumSessions < MaxSessions ->
spawn_new_connection;
not_found ->
not_found;
{ok, Pid, PSz, _Q1} when NumSessions < MaxSessions, PSz > 0 ->
do_trace("Found Pid -> ~p. PSz -> ~p~n", [Pid, PSz]),
spawn_new_connection;
Ret ->
do_trace("get_free_worker: Ret -> ~p~n", [Ret]),
Ret
end.
get_free_worker_1(_, 0, _, {undefined, undefined}) ->
not_found;
get_free_worker_1({{value, WorkerPid}, Q}, 0, _, {MinPSzPid, PSz}) ->
{ok, MinPSzPid, PSz, queue:in(WorkerPid, Q)};
get_free_worker_1({{value, Pid}, Q1}, NumSessions, MaxPSz, {_MinPSzPid, MinPSz}=V) ->
do_trace("Pid -> ~p. MaxPSz -> ~p MinPSz -> ~p~n", [Pid, MaxPSz, MinPSz]),
case get({Pid, pending_reqs}) of
NumP when NumP < MaxPSz, NumP < MinPSz ->
get_free_worker_1(queue:out(queue:in(Pid, Q1)), NumSessions-1, MaxPSz, {Pid, NumP});
_ ->
get_free_worker_1(queue:out(queue:in(Pid, Q1)), NumSessions-1, MaxPSz, V)
end;
get_free_worker_1({empty, _Q}, _, _, _) ->
do_trace("Queue empty -> not_found~n", []),
not_found;
get_free_worker_1(Q, NumSessions, MaxPSz, MinPSz) ->
get_free_worker_1(queue:out(Q), NumSessions, MaxPSz, MinPSz).
spawn_new_connection({_Pool_name, Host, Port}, Trace, Options) ->
spawn_new_connection({Host, Port}, Trace, Options);
spawn_new_connection({Host, Port}, Trace, Options) ->
{ok, Pid} = ibrowse_http_client:start_link([Host, Port, Trace, Options]),
Key = maybe_named_key(Host, Port, Options),
put({Pid, pending_reqs}, 1),
put({Pid, hostport}, Key),
{ok, Pid}.
del_from_q({empty, Q}, _, _) ->
Q;
del_from_q({{value, V}, Q}, 0, _Elem) ->
queue:in(V, Q);
del_from_q({{value, Elem}, Q1}, QSize, Elem) ->
del_from_q(queue:out(Q1), QSize-1, Elem);
del_from_q({{value, V}, Q}, QSize, Elem) ->
del_from_q(queue:out(queue:in(V, Q)), QSize-1, Elem);
del_from_q(Q, QSize, Elem) ->
del_from_q(queue:out(Q), QSize, Elem).
maybe_named_key(Host, Port, Opts) ->
case lists:keysearch(name, 1, Opts) of
{value, {name, Pool_name}} when is_atom(Pool_name) ->
{Pool_name, Host, Port};
_ ->
{Host, Port}
end.
% get_value(Tag, TVL) ->
% {value, {_, V}} = lists:keysearch(Tag,1,TVL),
% V.
get_value(Tag, TVL, DefVal) ->
case lists:keysearch(Tag, 1, TVL) of
{value, {_, V}} ->
V;
false ->
DefVal
end.
do_trace(Fmt, Args) ->
do_trace(get(my_trace_flag), Fmt, Args).
% do_trace(true, Fmt, Args) ->
% io:format("~s -- IBROWSE - "++Fmt, [printable_date() | Args]);
do_trace(true, Fmt, Args) ->
io:format("~s -- IBROWSE - "++Fmt, [printable_date() | Args]);
do_trace(_, _, _) -> ok.
do_get_connection(#url{host = Host, port = Port}, []) ->
{ok, Pid} = ibrowse_lb:start_link([Host, Port]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = Pid}),
Pid;
do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
Pid.

+ 6
- 0
src/ibrowse.hrl 查看文件

@ -3,4 +3,10 @@
-record(url, {abspath, host, port, username, password, path, protocol}).
-record(lb_pid, {host_port, pid}).
-record(client_conn, {key, cur_pipeline_size = 0, reqs_served = 0}).
-record(ibrowse_conf, {key, value}).
-endif.

+ 246
- 290
src/ibrowse_http_client.erl 查看文件

@ -6,7 +6,7 @@
%%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%%-------------------------------------------------------------------
-module(ibrowse_http_client).
-vsn('$Id: ibrowse_http_client.erl,v 1.16 2008/02/27 23:39:23 chandrusf Exp $ ').
-vsn('$Id: ibrowse_http_client.erl,v 1.17 2008/03/27 01:35:50 chandrusf Exp $ ').
-behaviour(gen_server).
%%--------------------------------------------------------------------
@ -15,33 +15,51 @@
%%--------------------------------------------------------------------
%% External exports
-export([start_link/1]).
-export([
start_link/1,
start/1,
stop/1,
send_req/7
]).
-ifdef(debug).
-compile(export_all).
-endif.
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([parse_url/1,
printable_date/0]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-include("ibrowse.hrl").
-record(state, {host, port, use_proxy = false, proxy_auth_digest,
ssl_options=[], is_ssl, socket,
-record(state, {host, port,
use_proxy = false, proxy_auth_digest,
ssl_options = [], is_ssl = false, socket,
reqs=queue:new(), cur_req, status=idle, http_status_code,
reply_buffer=[], rep_buf_size=0, recvd_headers=[],
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding, chunk_size,
chunks=[]}).
chunks=[], lb_ets_tid, cur_pipeline_size = 0}).
-record(request, {url, method, options, from,
stream_to, req_id,
save_response_to_file = false,
tmp_file_name, tmp_file_fd}).
-import(ibrowse_lib, [
parse_url/1,
printable_date/0,
get_value/2,
get_value/3,
do_trace/2
]).
%%====================================================================
%% External functions
%%====================================================================
@ -49,9 +67,26 @@
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start(Args) ->
gen_server:start(?MODULE, Args, []).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
stop(Conn_pid) ->
gen_server:call(Conn_pid, stop).
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
Timeout_1 = case Timeout of
infinity ->
infinity;
_ when is_integer(Timeout) ->
Timeout + 100
end,
gen_server:call(
Conn_Pid,
{send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout_1).
%%====================================================================
%% Server functions
%%====================================================================
@ -64,15 +99,20 @@ start_link(Args) ->
%% ignore |
%% {stop, Reason}
%%--------------------------------------------------------------------
init([Host, Port, Trace, Options]) ->
{SSLOptions, IsSSL} = case get_value(is_ssl, Options, false) of
false -> {[], false};
true -> {get_value(ssl_options, Options), true}
end,
State = #state{host=Host, port=Port, is_ssl=IsSSL, ssl_options=SSLOptions},
put(ibrowse_http_client_host, Host),
put(ibrowse_http_client_port, Port),
put(my_trace_flag, Trace),
init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
State = #state{host = Host,
port = Port,
ssl_options = SSLOptions,
is_ssl = Is_ssl,
lb_ets_tid = Lb_Tid},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, State};
init({Host, Port}) ->
State = #state{host = Host,
port = Port},
put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
{ok, State}.
%%--------------------------------------------------------------------
@ -85,48 +125,29 @@ init([Host, Port, Trace, Options]) ->
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
%% Received a request when the remote server has already sent us a
%% Connection: Close header
handle_info({{send_req, Req}, From}, #state{is_closing=true}=State) ->
ibrowse ! {conn_closing, self(), {send_req, Req}, From},
{noreply, State};
handle_call({send_req, _},
_From,
#state{is_closing=true}=State) ->
{reply, {error, connection_closing}, State};
%% First request when no connection exists.
handle_info({{send_req, [Url, Headers, Method,
Body, Options, Timeout]}, From},
handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From,
#state{socket=undefined,
host=Host, port=Port}=State) ->
State_1 = case get_value(proxy_host, Options, false) of
false ->
State;
_PHost ->
ProxyUser = get_value(proxy_user, Options, []),
ProxyPassword = get_value(proxy_password, Options, []),
Digest = http_auth_digest(ProxyUser, ProxyPassword),
State#state{use_proxy = true,
proxy_auth_digest = Digest}
end,
{Host_1, Port_1, State_1} =
case get_value(proxy_host, Options, false) of
false ->
{Host, Port, State};
PHost ->
ProxyUser = get_value(proxy_user, Options, []),
ProxyPassword = get_value(proxy_password, Options, []),
Digest = http_auth_digest(ProxyUser, ProxyPassword),
{PHost, get_value(proxy_port, Options, 80),
State#state{use_proxy = true,
proxy_auth_digest = Digest}}
end,
StreamTo = get_value(stream_to, Options, undefined),
ReqId = make_req_id(),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
@ -146,7 +167,7 @@ handle_info({{send_req, [Url, Headers, Method,
_ ->
round(Timeout*0.9)
end,
case do_connect(Host, Port, Options, State_2, Timeout_1) of
case do_connect(Host_1, Port_1, Options, State_2, Timeout_1) of
{ok, Sock} ->
Ref = case Timeout of
infinity ->
@ -163,26 +184,28 @@ handle_info({{send_req, [Url, Headers, Method,
_ ->
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
{noreply, State_2#state{socket=Sock,
send_timer = Ref,
cur_req = NewReq,
status=get_header}};
State_3 = inc_pipeline_counter(State_2#state{socket = Sock,
send_timer = Ref,
cur_req = NewReq,
status = get_header}),
{noreply, State_3};
Err ->
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
ibrowse:shutting_down(),
ibrowse:reply(From, {error, send_failed}),
gen_server:reply(From, {error, send_failed}),
{stop, normal, State_2}
end;
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
ibrowse:shutting_down(),
ibrowse:reply(From, {error, conn_failed}),
gen_server:reply(From, {error, conn_failed}),
{stop, normal, State_2}
end;
%% Request which is to be pipelined
handle_info({{send_req, [Url, Headers, Method,
Body, Options, Timeout]}, From},
handle_call({send_req, {Url, Headers, Method,
Body, Options, Timeout}},
From,
#state{socket=Sock, status=Status, reqs=Reqs}=State) ->
do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]),
StreamTo = get_value(stream_to, Options, undefined),
@ -198,6 +221,7 @@ handle_info({{send_req, [Url, Headers, Method,
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
ok ->
State_2 = inc_pipeline_counter(State_1),
do_setopts(Sock, [{active, true}], State#state.is_ssl),
case Timeout of
infinity ->
@ -205,31 +229,54 @@ handle_info({{send_req, [Url, Headers, Method,
_ ->
erlang:send_after(Timeout, self(), {req_timedout, From})
end,
State_2 = case Status of
State_3 = case Status of
idle ->
State_1#state{status = get_header,
State_2#state{status = get_header,
cur_req = NewReq};
_ ->
State_1
State_2
end,
case StreamTo of
undefined ->
ok;
_ ->
%% We don't use ibrowse:reply here because we are
%% just sending back the request ID. Not the
%% response
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
{noreply, State_2};
{noreply, State_3};
Err ->
shutting_down(State_1),
do_trace("Send request failed: Reason: ~p~n", [Err]),
ibrowse:reply(From, {error, send_failed}),
gen_server:reply(From, {error, send_failed}),
do_error_reply(State, send_failed),
ibrowse:shutting_down(),
{stop, normal, State_1}
end;
handle_call(stop, _From, #state{socket = Socket, is_ssl = Is_ssl} = State) ->
do_close(Socket, Is_ssl),
do_error_reply(State, closing_on_request),
{stop, normal, State};
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, State) ->
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
@ -249,13 +296,12 @@ handle_info({req_timedout, From}, State) ->
false ->
{noreply, State};
{value, _} ->
ibrowse:shutting_down(),
shutting_down(State),
do_error_reply(State, req_timedout),
{stop, normal, State}
end;
handle_info({trace, Bool}, State) ->
do_trace("Turning trace on: Host: ~p Port: ~p~n", [State#state.host, State#state.port]),
put(my_trace_flag, Bool),
{noreply, State};
@ -293,7 +339,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
handle_sock_data(Data, #state{status=idle}=State) ->
do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
ibrowse:shutting_down(),
shutting_down(State),
do_error_reply(State, data_in_status_idle),
do_close(State#state.socket, State#state.is_ssl),
{stop, normal, State};
@ -301,10 +347,10 @@ handle_sock_data(Data, #state{status=idle}=State) ->
handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
case parse_response(Data, State) of
{error, _Reason} ->
ibrowse:shutting_down(),
shutting_down(State),
{stop, normal, State};
stop ->
ibrowse:shutting_down(),
shutting_down(State),
{stop, normal, State};
State_1 ->
do_setopts(Sock, [{active, true}], State#state.is_ssl),
@ -319,7 +365,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
true ->
case accumulate_response(Data, State) of
{error, Reason} ->
ibrowse:shutting_down(),
shutting_down(State),
fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
@ -330,12 +376,12 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
_ ->
case parse_11_response(Data, State) of
{error, Reason} ->
ibrowse:shutting_down(),
shutting_down(State),
fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
stop ->
ibrowse:shutting_down(),
shutting_down(State),
{stop, normal, State};
State_1 ->
do_setopts(Sock, [{active, true}], State#state.is_ssl),
@ -398,7 +444,7 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
end.
make_tmp_filename() ->
DownloadDir = safe_get_env(ibrowse, download_dir, filename:absname("./")),
DownloadDir = ibrowse:get_config_value(download_dir, filename:absname("./")),
{A,B,C} = now(),
filename:join([DownloadDir,
"ibrowse_tmp_file_"++
@ -411,11 +457,11 @@ make_tmp_filename() ->
%% Handles the case when the server closes the socket
%%--------------------------------------------------------------------
handle_sock_closed(#state{status=get_header}=State) ->
ibrowse:shutting_down(),
shutting_down(State),
do_error_reply(State, connection_closed);
handle_sock_closed(#state{cur_req=undefined}) ->
ibrowse:shutting_down();
handle_sock_closed(#state{cur_req=undefined} = State) ->
shutting_down(State);
%% We check for IsClosing because this the server could have sent a
%% Connection-Close header and has closed the socket to indicate end
@ -431,27 +477,55 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
{_, Reqs_1} = queue:out(Reqs),
case TmpFilename of
undefined ->
do_reply(From, StreamTo, ReqId,
do_reply(State, From, StreamTo, ReqId,
{ok, SC, Headers,
lists:flatten(lists:reverse(Buf))});
_ ->
file:close(Fd),
do_reply(From, StreamTo, ReqId,
do_reply(State, From, StreamTo, ReqId,
{ok, SC, Headers, {file, TmpFilename}})
end,
do_error_reply(State#state{reqs = Reqs_1}, connection_closed);
do_error_reply(State#state{reqs = Reqs_1}, connection_closed),
State;
_ ->
do_error_reply(State, connection_closed)
do_error_reply(State, connection_closed),
State
end.
do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) ->
ssl:connect(Host, Port, [{active, false} | SSLOptions], Timeout);
ssl:connect(Host, Port, [{nodelay, true}, {active, false} | SSLOptions], Timeout);
do_connect(Host, Port, _Options, _State, Timeout) ->
gen_tcp:connect(Host, Port, [{active, false}], Timeout).
gen_tcp:connect(Host, Port, [{nodelay, true}, {active, false}], Timeout).
do_send(Sock, Req, true) -> ssl:send(Sock, Req);
do_send(Sock, Req, false) -> gen_tcp:send(Sock, Req).
%% @spec do_send_body(Sock::socket_descriptor(), Source::source_descriptor(), IsSSL::boolean()) -> ok | error()
%% source_descriptor() = fun_arity_0 |
%% {fun_arity_0} |
%% {fun_arity_1, term()}
%% error() = term()
do_send_body(Sock, Source, IsSSL) when is_function(Source) ->
do_send_body(Sock, {Source}, IsSSL);
do_send_body(Sock, {Source}, IsSSL) when is_function(Source) ->
do_send_body1(Sock, Source, IsSSL, Source());
do_send_body(Sock, {Source, State}, IsSSL) when is_function(Source) ->
do_send_body1(Sock, Source, IsSSL, Source(State));
do_send_body(Sock, Body, IsSSL) ->
do_send(Sock, Body, IsSSL).
do_send_body1(Sock, Source, IsSSL, Resp) ->
case Resp of
{ok, Data} ->
do_send(Sock, Data, IsSSL),
do_send_body(Sock, {Source}, IsSSL);
{ok, Data, NewState} ->
do_send(Sock, Data, IsSSL),
do_send_body(Sock, {Source, NewState}, IsSSL);
eof -> ok;
Err -> Err
end.
do_close(Sock, true) -> ssl:close(Sock);
do_close(Sock, false) -> gen_tcp:close(Sock).
@ -466,12 +540,12 @@ check_ssl_options(Options, State) ->
State#state{is_ssl=true, ssl_options=get_value(ssl_options, Options)}
end.
send_req_1(Url, Headers, Method, Body, Options, Sock, State) ->
#url{abspath = AbsPath,
host = Host,
port = Port,
path = RelPath} = Url_1 = parse_url(Url),
Headers_1 = add_auth_headers(Url_1, Options, Headers, State),
send_req_1(#url{abspath = AbsPath,
host = Host,
port = Port,
path = RelPath} = Url,
Headers, Method, Body, Options, Sock, State) ->
Headers_1 = add_auth_headers(Url, Options, Headers, State),
HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
false ->
case Port of
@ -481,18 +555,24 @@ send_req_1(Url, Headers, Method, Body, Options, Sock, State) ->
{value, {_, Host_h_val}} ->
Host_h_val
end,
Req = make_request(Method,
[{"Host", HostHeaderValue} | Headers_1],
AbsPath, RelPath, Body, Options, State#state.use_proxy),
case get(my_trace_flag) of %%Avoid the binary operations if trace is not on...
{Req, Body_1} = make_request(Method,
[{"Host", HostHeaderValue} | Headers_1],
AbsPath, RelPath, Body, Options, State#state.use_proxy),
case get(my_trace_flag) of
true ->
%%Avoid the binary operations if trace is not on...
NReq = binary_to_list(list_to_binary(Req)),
do_trace("Sending request: ~n"
"--- Request Begin ---~n~s~n"
"--- Request End ---~n", [NReq]);
_ -> ok
end,
SndRes = do_send(Sock, Req, State#state.is_ssl),
SndRes = case do_send(Sock, Req, State#state.is_ssl) of
ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
Err ->
io:format("Err: ~p~n", [Err]),
Err
end,
do_setopts(Sock, [{active, true}], State#state.is_ssl),
SndRes.
@ -549,16 +629,20 @@ e(X) -> exit({bad_encode_base64_token, X}).
make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Headers_1 = case get_value(content_length, Headers, false) of
false when (Body == []) or (Body == <<>>) ->
Headers;
false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers];
false ->
[{"content-length", integer_to_list(length(Body))} | Headers];
true ->
Headers
end,
Headers_1 =
case get_value(content_length, Headers, false) of
false when (Body == []) or
(Body == <<>>) or
is_tuple(Body) or
is_function(Body) ->
Headers;
false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers];
false ->
[{"content-length", integer_to_list(length(Body))} | Headers];
_ ->
Headers
end,
{Headers_2, Body_1} =
case get_value(transfer_encoding, Options, false) of
false ->
@ -578,7 +662,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) ->
false ->
RelPath
end,
[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl(), Body_1].
{[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}.
http_vsn_string({0,9}) -> "HTTP/0.9";
http_vsn_string({1,0}) -> "HTTP/1.0";
@ -649,7 +733,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
cur_req=CurReq}=State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
method=Method} = CurReq,
MaxHeaderSize = safe_get_env(ibrowse, max_headers_size, infinity),
MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
case scan_header(Data, Acc) of
{yes, Headers, Data_1} ->
do_trace("Recvd Header Data -> ~s~n----~n", [Headers]),
@ -661,7 +745,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
IsClosing = is_connection_closing(HttpVsn, ConnClose),
case IsClosing of
true ->
ibrowse:shutting_down();
shutting_down(State);
false ->
ok
end,
@ -673,9 +757,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
_ when Method == head ->
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
do_reply(From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
cancel_timer(State#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1),
State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
parse_response(Data_1, State_3);
_ when hd(StatCode) == $1 ->
@ -692,9 +776,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
%% RFC2616 - Sec 4.4
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
do_reply(From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
cancel_timer(State#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1),
State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
parse_response(Data_1, State_3);
_ when TransferEncoding == "chunked" ->
@ -914,9 +998,9 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
do_reply(From, StreamTo, ReqId, {ok, SCode, RespHeaders, ResponseBody}),
State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, ResponseBody}),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
State_1;
State_2;
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId},
#state{http_status_code=SCode, recvd_headers=RespHeaders,
reply_buffer=RepBuf, transfer_encoding=TEnc,
@ -930,12 +1014,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId},
State_1 = set_cur_request(State),
case get(conn_close) of
"close" ->
do_reply(From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
exit(normal);
_ ->
do_reply(From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
State_1
State_2
end.
reset_state(State) ->
@ -1108,134 +1192,38 @@ is_whitespace($\t) -> true;
is_whitespace(_) -> false.
parse_url(Url) ->
parse_url(Url, get_protocol, #url{abspath=Url}, []).
parse_url([$:, $/, $/ | _], get_protocol, Url, []) ->
{invalid_uri_1, Url};
parse_url([$:, $/, $/ | T], get_protocol, Url, TmpAcc) ->
Prot = list_to_atom(lists:reverse(TmpAcc)),
parse_url(T, get_username,
Url#url{protocol = Prot},
[]);
parse_url([$/ | T], get_username, Url, TmpAcc) ->
%% No username/password. No port number
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Url#url.protocol),
path = [$/ | T]};
parse_url([$: | T], get_username, Url, TmpAcc) ->
%% It is possible that no username/password has been
%% specified. But we'll continue with the assumption that there is
%% a username/password. If we encounter a '@' later on, there is a
%% username/password indeed. If we encounter a '/', it was
%% actually the hostname
parse_url(T, get_password,
Url#url{username = lists:reverse(TmpAcc)},
[]);
parse_url([$@ | T], get_username, Url, TmpAcc) ->
parse_url(T, get_host,
Url#url{username = lists:reverse(TmpAcc),
password = ""},
[]);
parse_url([$@ | T], get_password, Url, TmpAcc) ->
parse_url(T, get_host,
Url#url{password = lists:reverse(TmpAcc)},
[]);
parse_url([$/ | T], get_password, Url, TmpAcc) ->
%% Ok, what we thought was the username/password was the hostname
%% and portnumber
#url{username=User} = Url,
Port = list_to_integer(lists:reverse(TmpAcc)),
Url#url{host = User,
port = Port,
username = undefined,
password = undefined,
path = [$/ | T]};
parse_url([$: | T], get_host, #url{} = Url, TmpAcc) ->
parse_url(T, get_port,
Url#url{host = lists:reverse(TmpAcc)},
[]);
parse_url([$/ | T], get_host, #url{protocol=Prot} = Url, TmpAcc) ->
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Prot),
path = [$/ | T]};
parse_url([$/ | T], get_port, #url{protocol=Prot} = Url, TmpAcc) ->
Port = case TmpAcc of
[] ->
default_port(Prot);
_ ->
list_to_integer(lists:reverse(TmpAcc))
end,
Url#url{port = Port, path = [$/ | T]};
parse_url([H | T], State, Url, TmpAcc) ->
parse_url(T, State, Url, [H | TmpAcc]);
parse_url([], get_host, Url, TmpAcc) when TmpAcc /= [] ->
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Url#url.protocol),
path = "/"};
parse_url([], get_username, Url, TmpAcc) when TmpAcc /= [] ->
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Url#url.protocol),
path = "/"};
parse_url([], get_port, #url{protocol=Prot} = Url, TmpAcc) ->
Port = case TmpAcc of
[] ->
default_port(Prot);
_ ->
list_to_integer(lists:reverse(TmpAcc))
end,
Url#url{port = Port,
path = "/"};
parse_url([], get_password, Url, TmpAcc) ->
%% Ok, what we thought was the username/password was the hostname
%% and portnumber
#url{username=User} = Url,
Port = case TmpAcc of
[] ->
default_port(Url#url.protocol);
_ ->
list_to_integer(lists:reverse(TmpAcc))
end,
Url#url{host = User,
port = Port,
username = undefined,
password = undefined,
path = "/"};
parse_url([], State, Url, TmpAcc) ->
{invalid_uri_2, State, Url, TmpAcc}.
default_port(http) -> 80;
default_port(https) -> 443;
default_port(ftp) -> 21.
send_async_headers(_ReqId, undefined, _StatCode, _Headers) ->
ok;
send_async_headers(ReqId, StreamTo, StatCode, Headers) ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers}.
do_reply(From, undefined, _, Msg) ->
ibrowse:reply(From, Msg);
do_reply(_From, StreamTo, ReqId, {ok, _, _, _}) ->
ibrowse:finished_async_request(),
catch StreamTo ! {ibrowse_async_response_end, ReqId};
do_reply(_From, StreamTo, ReqId, Msg) ->
catch StreamTo ! {ibrowse_async_response, ReqId, Msg}.
do_reply(State, From, undefined, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
do_reply(State, _From, StreamTo, ReqId, {ok, _, _, _}) ->
State_1 = dec_pipeline_counter(State),
catch StreamTo ! {ibrowse_async_response_end, ReqId},
State_1;
do_reply(State, _From, StreamTo, ReqId, Msg) ->
State_1 = dec_pipeline_counter(State),
catch StreamTo ! {ibrowse_async_response, ReqId, Msg},
State_1.
do_interim_reply(undefined, _ReqId, _Msg) ->
ok;
do_interim_reply(StreamTo, ReqId, Msg) ->
catch StreamTo ! {ibrowse_async_response, ReqId, Msg}.
do_error_reply(#state{reqs = Reqs}, Err) ->
do_error_reply(#state{reqs = Reqs} = State, Err) ->
ReqList = queue:to_list(Reqs),
lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId}) ->
do_reply(From, StreamTo, ReqId, {error, Err})
do_reply(State, From, StreamTo, ReqId, {error, Err})
end, ReqList).
fail_pipelined_requests(#state{reqs = Reqs, cur_req = CurReq} = State, Reply) ->
{_, Reqs_1} = queue:out(Reqs),
#request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq,
do_reply(From, StreamTo, ReqId, Reply),
do_reply(State, From, StreamTo, ReqId, Reply),
do_error_reply(State#state{reqs = Reqs_1}, previous_request_failed).
@ -1248,18 +1236,6 @@ split_list_at(List2, 0, List1) ->
split_list_at([H | List2], N, List1) ->
split_list_at(List2, N-1, [H | List1]).
get_value(Tag, TVL) ->
{value, {_, V}} = lists:keysearch(Tag,1,TVL),
V.
get_value(Tag, TVL, DefVal) ->
case lists:keysearch(Tag, 1, TVL) of
{value, {_, V}} ->
V;
false ->
DefVal
end.
hexlist_to_integer(List) ->
hexlist_to_integer(lists:reverse(List), 1, 0).
hexlist_to_integer([H | T], Multiplier, Acc) ->
@ -1290,14 +1266,6 @@ to_ascii($8) -> 8;
to_ascii($9) -> 9;
to_ascii($0) -> 0.
safe_get_env(App, EnvVar, DefaultValue) ->
case application:get_env(App,EnvVar) of
undefined ->
DefaultValue;
{ok, V} ->
V
end.
cancel_timer(undefined) -> ok;
cancel_timer(Ref) -> erlang:cancel_timer(Ref).
@ -1313,38 +1281,6 @@ cancel_timer(Ref, {eat_message, Msg}) ->
make_req_id() ->
now().
do_trace(Fmt, Args) ->
do_trace(get(my_trace_flag), Fmt, Args).
% Useful for debugging
% do_trace(_, Fmt, Args) ->
% io:format("~s -- CLI(~p,~p) - "++Fmt, [printable_date(),
% get(ibrowse_http_client_host),
% get(ibrowse_http_client_port) | Args]);
do_trace(true, Fmt, Args) ->
io:format("~s -- CLI(~p,~p) - "++Fmt,
[printable_date(),
get(ibrowse_http_client_host),
get(ibrowse_http_client_port) | Args]);
do_trace(_, _, _) -> ok.
printable_date() ->
{{Y,Mo,D},{H, M, S}} = calendar:local_time(),
{_,_,MicroSecs} = now(),
[integer_to_list(Y),
$-,
integer_to_list(Mo),
$-,
integer_to_list(D),
$_,
integer_to_list(H),
$:,
integer_to_list(M),
$:,
integer_to_list(S),
$:,
integer_to_list(MicroSecs div 1000)].
to_lower(Str) ->
to_lower(Str, []).
to_lower([H|T], Acc) when H >= $A, H =< $Z ->
@ -1354,3 +1290,23 @@ to_lower([H|T], Acc) ->
to_lower([], Acc) ->
lists:reverse(Acc).
shutting_down(#state{lb_ets_tid = undefined}) ->
ok;
shutting_down(#state{lb_ets_tid = Tid,
cur_pipeline_size = Sz}) ->
ets:delete(Tid, {Sz, self()}).
inc_pipeline_counter(#state{is_closing = true} = State) ->
State;
inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
State#state{cur_pipeline_size = Pipe_sz + 1}.
dec_pipeline_counter(#state{is_closing = true} = State) ->
State;
dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
State;
dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
lb_ets_tid = Tid} = State) ->
ets:delete(Tid, {Pipe_sz, self()}),
ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
State#state{cur_pipeline_size = Pipe_sz - 1}.

+ 195
- 0
src/ibrowse_lb.erl 查看文件

@ -0,0 +1,195 @@
%%%-------------------------------------------------------------------
%%% File : ibrowse_lb.erl
%%% Author : chandru <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%% Description :
%%%
%%% Created : 6 Mar 2008 by chandru <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%%-------------------------------------------------------------------
-module(ibrowse_lb).
-vsn('$Id: ibrowse_lb.erl,v 1.1 2008/03/27 01:36:21 chandrusf Exp $ ').
-author(chandru).
-behaviour(gen_server).
%%--------------------------------------------------------------------
%% Include files
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% External exports
-export([
start_link/1,
spawn_connection/5
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-record(state, {parent_pid,
ets_tid,
host,
port,
max_sessions,
max_pipeline_size,
num_cur_sessions = 0}).
-import(ibrowse_lib, [
parse_url/1,
printable_date/0,
get_value/3
]).
-include("ibrowse.hrl").
%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
%%====================================================================
%% Server functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%--------------------------------------------------------------------
init([Host, Port]) ->
process_flag(trap_exit, true),
Max_sessions = ibrowse:get_config_value({max_sessions, Host, Port}, 10),
Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]),
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
{ok, #state{parent_pid = whereis(ibrowse),
host = Host,
port = Port,
ets_tid = Tid,
max_pipeline_size = Max_pipe_sz,
max_sessions = Max_sessions}}.
spawn_connection(Lb_pid, Url,
Max_sessions,
Max_pipeline_size,
SSL_options)
when is_pid(Lb_pid),
is_record(Url, url),
is_integer(Max_pipeline_size),
is_integer(Max_sessions) ->
gen_server:call(Lb_pid,
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
% handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
% #state{max_sessions = Max_sess,
% ets_tid = Tid,
% max_pipeline_size = Max_pipe_sz,
% num_cur_sessions = Num} = State)
% when Num >= Max ->
% Reply = find_best_connection(Tid),
% {reply, sorry_dude_reuse, State};
%% Update max_sessions in #state with supplied value
handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
#state{ets_tid = Tid,
num_cur_sessions = Num} = State)
when Num >= Max_sess ->
Reply = find_best_connection(Tid, Max_pipe),
{reply, Reply, State#state{max_sessions = Max_sess}};
handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From,
#state{num_cur_sessions = Cur,
ets_tid = Tid} = State) ->
{ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}),
ets:insert(Tid, {{1, Pid}, []}),
{reply, {ok, Pid}, State#state{num_cur_sessions = Cur + 1}};
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({'EXIT', Parent, _Reason}, #state{parent_pid = Parent} = State) ->
{stop, normal, State};
handle_info({'EXIT', Pid, _Reason},
#state{num_cur_sessions = Cur,
ets_tid = Tid} = State) ->
ets:match_delete(Tid, {{'_', Pid}, '_'}),
{noreply, State#state{num_cur_sessions = Cur - 1}};
handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
find_best_connection(Tid, Max_pipe) ->
case ets:first(Tid) of
{Cur_sz, Pid} when Cur_sz < Max_pipe ->
ets:delete(Tid, {Cur_sz, Pid}),
ets:insert(Tid, {{Cur_sz + 1, Pid}, []}),
{ok, Pid};
_ ->
{error, retry_later}
end.

+ 163
- 4
src/ibrowse_lib.erl 查看文件

@ -5,22 +5,35 @@
%% @doc Module with a few useful functions
-module(ibrowse_lib).
-vsn('$Id: ibrowse_lib.erl,v 1.5 2007/04/20 00:36:30 chandrusf Exp $ ').
-vsn('$Id: ibrowse_lib.erl,v 1.6 2008/03/27 01:35:50 chandrusf Exp $ ').
-author('chandru').
-ifdef(debug).
-compile(export_all).
-endif.
-export([url_encode/1,
-include("ibrowse.hrl").
-export([
get_trace_status/2,
do_trace/2,
do_trace/3,
url_encode/1,
decode_rfc822_date/1,
status_code/1,
dec2hex/2,
drv_ue/1,
drv_ue/2,
encode_base64/1,
decode_base64/1
decode_base64/1,
get_value/2,
get_value/3,
parse_url/1,
printable_date/0
]).
get_trace_status(Host, Port) ->
ibrowse:get_config_value({trace, Host, Port}, false).
drv_ue(Str) ->
[{port, Port}| _] = ets:lookup(ibrowse_table, port),
drv_ue(Str, Port).
@ -53,7 +66,7 @@ url_encode_char([X | T], Acc) ->
url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]);
url_encode_char([], Acc) ->
Acc.
d2h(N) when N<10 -> N+$0;
d2h(N) -> N+$a-10.
@ -238,3 +251,149 @@ b64_to_int(X) when X >= $a, X =< $z -> X - $a + 26;
b64_to_int(X) when X >= $0, X =< $9 -> X - $0 + 52;
b64_to_int($+) -> 62;
b64_to_int($/) -> 63.
get_value(Tag, TVL, DefVal) ->
case lists:keysearch(Tag, 1, TVL) of
false ->
DefVal;
{value, {_, Val}} ->
Val
end.
get_value(Tag, TVL) ->
{value, {_, V}} = lists:keysearch(Tag,1,TVL),
V.
parse_url(Url) ->
parse_url(Url, get_protocol, #url{abspath=Url}, []).
parse_url([$:, $/, $/ | _], get_protocol, Url, []) ->
{invalid_uri_1, Url};
parse_url([$:, $/, $/ | T], get_protocol, Url, TmpAcc) ->
Prot = list_to_atom(lists:reverse(TmpAcc)),
parse_url(T, get_username,
Url#url{protocol = Prot},
[]);
parse_url([$/ | T], get_username, Url, TmpAcc) ->
%% No username/password. No port number
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Url#url.protocol),
path = [$/ | T]};
parse_url([$: | T], get_username, Url, TmpAcc) ->
%% It is possible that no username/password has been
%% specified. But we'll continue with the assumption that there is
%% a username/password. If we encounter a '@' later on, there is a
%% username/password indeed. If we encounter a '/', it was
%% actually the hostname
parse_url(T, get_password,
Url#url{username = lists:reverse(TmpAcc)},
[]);
parse_url([$@ | T], get_username, Url, TmpAcc) ->
parse_url(T, get_host,
Url#url{username = lists:reverse(TmpAcc),
password = ""},
[]);
parse_url([$@ | T], get_password, Url, TmpAcc) ->
parse_url(T, get_host,
Url#url{password = lists:reverse(TmpAcc)},
[]);
parse_url([$/ | T], get_password, Url, TmpAcc) ->
%% Ok, what we thought was the username/password was the hostname
%% and portnumber
#url{username=User} = Url,
Port = list_to_integer(lists:reverse(TmpAcc)),
Url#url{host = User,
port = Port,
username = undefined,
password = undefined,
path = [$/ | T]};
parse_url([$: | T], get_host, #url{} = Url, TmpAcc) ->
parse_url(T, get_port,
Url#url{host = lists:reverse(TmpAcc)},
[]);
parse_url([$/ | T], get_host, #url{protocol=Prot} = Url, TmpAcc) ->
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Prot),
path = [$/ | T]};
parse_url([$/ | T], get_port, #url{protocol=Prot} = Url, TmpAcc) ->
Port = case TmpAcc of
[] ->
default_port(Prot);
_ ->
list_to_integer(lists:reverse(TmpAcc))
end,
Url#url{port = Port, path = [$/ | T]};
parse_url([H | T], State, Url, TmpAcc) ->
parse_url(T, State, Url, [H | TmpAcc]);
parse_url([], get_host, Url, TmpAcc) when TmpAcc /= [] ->
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Url#url.protocol),
path = "/"};
parse_url([], get_username, Url, TmpAcc) when TmpAcc /= [] ->
Url#url{host = lists:reverse(TmpAcc),
port = default_port(Url#url.protocol),
path = "/"};
parse_url([], get_port, #url{protocol=Prot} = Url, TmpAcc) ->
Port = case TmpAcc of
[] ->
default_port(Prot);
_ ->
list_to_integer(lists:reverse(TmpAcc))
end,
Url#url{port = Port,
path = "/"};
parse_url([], get_password, Url, TmpAcc) ->
%% Ok, what we thought was the username/password was the hostname
%% and portnumber
#url{username=User} = Url,
Port = case TmpAcc of
[] ->
default_port(Url#url.protocol);
_ ->
list_to_integer(lists:reverse(TmpAcc))
end,
Url#url{host = User,
port = Port,
username = undefined,
password = undefined,
path = "/"};
parse_url([], State, Url, TmpAcc) ->
{invalid_uri_2, State, Url, TmpAcc}.
default_port(http) -> 80;
default_port(https) -> 443;
default_port(ftp) -> 21.
printable_date() ->
{{Y,Mo,D},{H, M, S}} = calendar:local_time(),
{_,_,MicroSecs} = now(),
[integer_to_list(Y),
$-,
integer_to_list(Mo),
$-,
integer_to_list(D),
$_,
integer_to_list(H),
$:,
integer_to_list(M),
$:,
integer_to_list(S),
$:,
integer_to_list(MicroSecs div 1000)].
do_trace(Fmt, Args) ->
do_trace(get(my_trace_flag), Fmt, Args).
-ifdef(DEBUG).
do_trace(_, Fmt, Args) ->
io:format("~s -- (~s) - "++Fmt,
[printable_date(),
get(ibrowse_trace_token) | Args]).
-else.
do_trace(true, Fmt, Args) ->
io:format("~s -- (~s) - "++Fmt,
[printable_date(),
get(ibrowse_trace_token) | Args]);
do_trace(_, _, _) ->
ok.
-endif.

+ 143
- 32
src/ibrowse_test.erl 查看文件

@ -4,50 +4,157 @@
%%% Created : 14 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
-module(ibrowse_test).
-vsn('$Id: ibrowse_test.erl,v 1.1 2005/05/05 22:28:28 chandrusf Exp $ ').
-vsn('$Id: ibrowse_test.erl,v 1.2 2008/03/27 01:35:50 chandrusf Exp $ ').
-export([
load_test/3,
send_reqs_1/3,
do_send_req/2,
unit_tests/0,
unit_tests/1,
drv_ue_test/0,
drv_ue_test/1,
ue_test/0,
ue_test/1
]).
-compile(export_all).
-import(ibrowse_http_client, [printable_date/0]).
-import(ibrowse_lib, [printable_date/0]).
send_reqs(Url, NumWorkers, NumReqsPerWorker) ->
%% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to
%% tweak settings before running the load test. The defaults are 10 and 10.
load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url),
is_integer(NumWorkers),
is_integer(NumReqsPerWorker),
NumWorkers > 0,
NumReqsPerWorker > 0 ->
proc_lib:spawn(?MODULE, send_reqs_1, [Url, NumWorkers, NumReqsPerWorker]).
send_reqs_1(Url, NumWorkers, NumReqsPerWorker) ->
Start_time = now(),
ets:new(pid_table, [named_table, public]),
ets:new(ibrowse_test_results, [named_table, public]),
init_results(),
process_flag(trap_exit, true),
Pids = lists:map(fun(_X) ->
proc_lib:spawn_link(?MODULE, do_send_req, [Url, NumReqsPerWorker, self()])
end, lists:seq(1,NumWorkers)),
put(num_reqs_per_worker, NumReqsPerWorker),
do_wait(Pids, now(), printable_date(), 0, 0).
do_wait([], _StartNow, StartTime, NumSucc, NumErrs) ->
io:format("~n~nDone...~nStartTime -> ~s~n", [StartTime]),
io:format("EndTime -> ~s~n", [printable_date()]),
io:format("NumSucc -> ~p~n", [NumSucc]),
io:format("NumErrs -> ~p~n", [NumErrs]);
do_wait(Pids, StartNow, StartTime, NumSucc, NumErrs) ->
log_msg("Starting spawning of workers...~n", []),
spawn_workers(Url, NumWorkers, NumReqsPerWorker),
log_msg("Finished spawning workers...~n", []),
do_wait(),
End_time = now(),
log_msg("All workers are done...~n", []),
log_msg("ibrowse_test_results table: ~n~p~n", [ets:tab2list(ibrowse_test_results)]),
log_msg("Start time: ~1000.p~n", [calendar:now_to_local_time(Start_time)]),
log_msg("End time : ~1000.p~n", [calendar:now_to_local_time(End_time)]),
Elapsed_time_secs = trunc(timer:now_diff(End_time, Start_time) / 1000000),
log_msg("Elapsed : ~p~n", [Elapsed_time_secs]),
log_msg("Reqs/sec : ~p~n", [(NumWorkers*NumReqsPerWorker) / Elapsed_time_secs]).
init_results() ->
ets:insert(ibrowse_test_results, {crash, 0}),
ets:insert(ibrowse_test_results, {send_failed, 0}),
ets:insert(ibrowse_test_results, {other_error, 0}),
ets:insert(ibrowse_test_results, {success, 0}),
ets:insert(ibrowse_test_results, {failed, 0}),
ets:insert(ibrowse_test_results, {timeout, 0}).
spawn_workers(_Url, 0, _) ->
ok;
spawn_workers(Url, NumWorkers, NumReqsPerWorker) ->
Pid = proc_lib:spawn_link(?MODULE, do_send_req, [Url, NumReqsPerWorker]),
ets:insert(pid_table, {Pid, []}),
spawn_workers(Url, NumWorkers - 1, NumReqsPerWorker).
do_wait() ->
receive
{done, From, _Time, {ChildNumSucc, ChildNumFail}} ->
do_wait(Pids--[From], StartNow, StartTime, NumSucc+ChildNumSucc, NumErrs+ChildNumFail);
{'EXIT',_, normal} ->
do_wait(Pids, StartNow, StartTime, NumSucc, NumErrs);
{'EXIT', From, _Reason} ->
do_wait(Pids--[From], StartNow, StartTime, NumSucc, NumErrs + get(num_reqs_per_worker))
{'EXIT', _, normal} ->
do_wait();
{'EXIT', Pid, Reason} ->
ets:delete(pid_table, Pid),
ets:insert(ibrowse_errors, {Pid, Reason}),
ets:update_counter(ibrowse_test_results, crash, 1),
do_wait();
Msg ->
io:format("Recvd unknown message...~p~n", [Msg]),
do_wait()
after 1000 ->
case ets:info(pid_table, size) of
0 ->
done;
_ ->
do_wait()
end
end.
do_send_req(Url, NumReqs, Parent) ->
StartTime = now(),
Res = do_send_req_1(Url, NumReqs, {0, 0}),
Parent ! {done, self(), StartTime, Res}.
do_send_req_1(_Url, 0, {NumSucc, NumFail}) ->
{NumSucc, NumFail};
do_send_req_1(Url, NumReqs, {NumSucc, NumFail}) ->
do_send_req(Url, NumReqs) ->
do_send_req_1(Url, NumReqs).
do_send_req_1(_Url, 0) ->
ets:delete(pid_table, self());
do_send_req_1(Url, NumReqs) ->
case ibrowse:send_req(Url, [], get, [], [], 10000) of
{ok, _Status, _Headers, _Body} ->
do_send_req_1(Url, NumReqs-1, {NumSucc+1, NumFail});
ets:update_counter(ibrowse_test_results, success, 1);
{error, req_timedout} ->
ets:update_counter(ibrowse_test_results, timeout, 1);
{error, send_failed} ->
ets:update_counter(ibrowse_test_results, send_failed, 1);
_Err ->
do_send_req_1(Url, NumReqs-1, {NumSucc, NumFail+1})
ets:update_counter(ibrowse_test_results, other_error, 1),
ok
end,
do_send_req_1(Url, NumReqs-1).
%%------------------------------------------------------------------------------
%% Unit Tests
%%------------------------------------------------------------------------------
-define(TEST_LIST, [{"http://intranet/messenger", get},
{"http://www.google.co.uk", get},
{"http://www.google.com", get},
{"http://www.google.com", options},
{"http://www.sun.com", get},
{"http://www.oracle.com", get},
{"http://www.bbc.co.uk", get},
{"http://www.bbc.co.uk", trace},
{"http://www.bbc.co.uk", options},
{"http://yaws.hyber.org", get},
{"http://jigsaw.w3.org/HTTP/ChunkedScript", get},
{"http://jigsaw.w3.org/HTTP/TE/foo.txt", get},
{"http://jigsaw.w3.org/HTTP/TE/bar.txt", get},
{"http://jigsaw.w3.org/HTTP/connection.html", get},
{"http://jigsaw.w3.org/HTTP/cc.html", get},
{"http://jigsaw.w3.org/HTTP/cc-private.html", get},
{"http://jigsaw.w3.org/HTTP/cc-proxy-revalidate.html", get},
{"http://jigsaw.w3.org/HTTP/cc-nocache.html", get},
{"http://jigsaw.w3.org/HTTP/h-content-md5.html", get},
{"http://jigsaw.w3.org/HTTP/h-retry-after.html", get},
{"http://jigsaw.w3.org/HTTP/h-retry-after-date.html", get},
{"http://jigsaw.w3.org/HTTP/neg", get},
{"http://jigsaw.w3.org/HTTP/negbad", get},
{"http://jigsaw.w3.org/HTTP/400/toolong/", get},
{"http://jigsaw.w3.org/HTTP/300/", get},
{"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
{"http://jigsaw.w3.org/HTTP/CL/", get}
]).
unit_tests() ->
unit_tests([]).
unit_tests(Options) ->
lists:foreach(fun({Url, Method}) ->
execute_req(Url, Method, Options);
({Url, Method, X_Opts}) ->
execute_req(Url, Method, X_Opts ++ Options)
end, ?TEST_LIST).
execute_req(Url, Method) ->
execute_req(Url, Method, []).
execute_req(Url, Method, Options) ->
io:format("~s, ~p: ", [Url, Method]),
Result = (catch ibrowse:send_req(Url, [], Method, [], Options)),
case Result of
{ok, SCode, _H, _B} ->
io:format("Status code: ~p~n", [SCode]);
Err ->
io:format("Err -> ~p~n", [Err])
end.
drv_ue_test() ->
@ -72,3 +179,7 @@ ue_test(Data) ->
io:format("Data Length -> ~p~n", [length(Data)]),
io:format("Res Length -> ~p~n", [length(Res)]).
% io:format("Result -> ~s~n", [Res]).
log_msg(Fmt, Args) ->
io:format("~s -- " ++ Fmt,
[ibrowse_lib:printable_date() | Args]).

+ 1
- 1
vsn.mk 查看文件

@ -1,2 +1,2 @@
IBROWSE_VSN = 1.3.1
IBROWSE_VSN = 1.4

Loading…
取消
儲存