Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

950 строки
38 KiB

20 лет назад
18 лет назад
11 лет назад
14 лет назад
20 лет назад
17 лет назад
20 лет назад
20 лет назад
20 лет назад
20 лет назад
17 лет назад
14 лет назад
13 лет назад
20 лет назад
17 лет назад
17 лет назад
20 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
13 лет назад
20 лет назад
20 лет назад
20 лет назад
11 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
18 лет назад
18 лет назад
14 лет назад
15 лет назад
11 лет назад
15 лет назад
20 лет назад
20 лет назад
18 лет назад
15 лет назад
12 лет назад
20 лет назад
20 лет назад
17 лет назад
17 лет назад
17 лет назад
17 лет назад
17 лет назад
11 лет назад
20 лет назад
17 лет назад
17 лет назад
14 лет назад
17 лет назад
17 лет назад
14 лет назад
20 лет назад
20 лет назад
20 лет назад
20 лет назад
20 лет назад
14 лет назад
13 лет назад
17 лет назад
13 лет назад
17 лет назад
13 лет назад
13 лет назад
13 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
17 лет назад
20 лет назад
20 лет назад
17 лет назад
17 лет назад
20 лет назад
20 лет назад
17 лет назад
17 лет назад
17 лет назад
17 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
17 лет назад
20 лет назад
20 лет назад
17 лет назад
20 лет назад
17 лет назад
17 лет назад
17 лет назад
20 лет назад
17 лет назад
  1. %%%-------------------------------------------------------------------
  2. %%% File : ibrowse.erl
  3. %%% Author : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  4. %%% Description : Load balancer process for HTTP client connections.
  5. %%%
  6. %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  7. %%%-------------------------------------------------------------------
  8. %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
  9. %% @copyright 2005-2014 Chandrashekhar Mullaparthi
  10. %% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This
  11. %% module implements the API of the HTTP client. There is one named
  12. %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
  13. %% one process to handle one TCP connection to a webserver
  14. %% (implemented in the module ibrowse_http_client). Multiple connections to a
  15. %% webserver are setup based on the settings for each webserver. The
  16. %% ibrowse process also determines which connection to pipeline a
  17. %% certain request on. The functions to call are send_req/3,
  18. %% send_req/4, send_req/5, send_req/6.
  19. %%
  20. %% <p>Here are a few sample invocations.</p>
  21. %%
  22. %% <code>
  23. %% ibrowse:send_req("http://intranet/messenger/", [], get).
  24. %% <br/><br/>
  25. %%
  26. %% ibrowse:send_req("http://www.google.com/", [], get, [],
  27. %% [{proxy_user, "XXXXX"},
  28. %% {proxy_password, "XXXXX"},
  29. %% {proxy_host, "proxy"},
  30. %% {proxy_port, 8080}], 1000).
  31. %% <br/><br/>
  32. %%
  33. %%ibrowse:send_req("http://www.erlang.org/download/otp_src_R10B-3.tar.gz", [], get, [],
  34. %% [{proxy_user, "XXXXX"},
  35. %% {proxy_password, "XXXXX"},
  36. %% {proxy_host, "proxy"},
  37. %% {proxy_port, 8080},
  38. %% {save_response_to_file, true}], 1000).
  39. %% <br/><br/>
  40. %%
  41. %% ibrowse:send_req("http://www.erlang.org", [], head).
  42. %%
  43. %% <br/><br/>
  44. %% ibrowse:send_req("http://www.sun.com", [], options).
  45. %%
  46. %% <br/><br/>
  47. %% ibrowse:send_req("http://www.bbc.co.uk", [], trace).
  48. %%
  49. %% <br/><br/>
  50. %% ibrowse:send_req("http://www.google.com", [], get, [],
  51. %% [{stream_to, self()}]).
  52. %% </code>
  53. %%
  54. -module(ibrowse).
  55. -behaviour(gen_server).
  56. %%--------------------------------------------------------------------
  57. %% Include files
  58. %%--------------------------------------------------------------------
  59. %%--------------------------------------------------------------------
  60. %% External exports
  61. -export([start_link/0, start/0, stop/0]).
  62. %% gen_server callbacks
  63. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  64. terminate/2, code_change/3]).
  65. %% API interface
  66. -export([
  67. rescan_config/0,
  68. rescan_config/1,
  69. add_config/1,
  70. get_config_value/1,
  71. get_config_value/2,
  72. spawn_worker_process/1,
  73. spawn_worker_process/2,
  74. spawn_link_worker_process/1,
  75. spawn_link_worker_process/2,
  76. stop_worker_process/1,
  77. send_req/3,
  78. send_req/4,
  79. send_req/5,
  80. send_req/6,
  81. send_req_direct/4,
  82. send_req_direct/5,
  83. send_req_direct/6,
  84. send_req_direct/7,
  85. stream_next/1,
  86. stream_close/1,
  87. set_max_sessions/3,
  88. set_max_pipeline_size/3,
  89. set_dest/3,
  90. trace_on/0,
  91. trace_off/0,
  92. trace_on/2,
  93. trace_off/2,
  94. all_trace_off/0,
  95. show_dest_status/0,
  96. show_dest_status/1,
  97. show_dest_status/2,
  98. get_metrics/0,
  99. get_metrics/2
  100. ]).
  101. -ifdef(debug).
  102. -compile(export_all).
  103. -endif.
  104. -import(ibrowse_lib, [
  105. parse_url/1,
  106. get_value/3,
  107. do_trace/2
  108. ]).
  109. -record(state, {trace = false}).
  110. -include("ibrowse.hrl").
  111. -include_lib("stdlib/include/ms_transform.hrl").
  112. -define(DEF_MAX_SESSIONS,10).
  113. -define(DEF_MAX_PIPELINE_SIZE,10).
  114. %%====================================================================
  115. %% External functions
  116. %%====================================================================
  117. %%--------------------------------------------------------------------
  118. %% Function: start_link/0
  119. %% Description: Starts the server
  120. %%--------------------------------------------------------------------
  121. %% @doc Starts the ibrowse process linked to the calling process. Usually invoked by the supervisor ibrowse_sup
  122. %% @spec start_link() -> {ok, pid()}
  123. start_link() ->
  124. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  125. %% @doc Starts the ibrowse process without linking. Useful when testing using the shell
  126. start() ->
  127. gen_server:start({local, ?MODULE}, ?MODULE, [], [{debug, []}]).
  128. %% @doc Stop the ibrowse process. Useful when testing using the shell.
  129. stop() ->
  130. case catch gen_server:call(ibrowse, stop) of
  131. {'EXIT',{noproc,_}} ->
  132. ok;
  133. Res ->
  134. Res
  135. end.
  136. %% @doc This is the basic function to send a HTTP request.
  137. %% The Status return value indicates the HTTP status code returned by the webserver
  138. %% @spec send_req(Url::string(), Headers::headerList(), Method::method()) -> response()
  139. %% headerList() = [{header(), value()}]
  140. %% header() = atom() | string() | binary()
  141. %% value() = term()
  142. %% method() = get | post | head | options | put | delete | trace | mkcol | propfind | proppatch | lock | unlock | move | copy
  143. %% Status = string()
  144. %% ResponseHeaders = [respHeader()]
  145. %% respHeader() = {headerName(), headerValue()}
  146. %% headerName() = string()
  147. %% headerValue() = string()
  148. %% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
  149. %% req_id() = term()
  150. %% ResponseBody = string() | {file, Filename}
  151. %% Reason = term()
  152. send_req(Url, Headers, Method) ->
  153. send_req(Url, Headers, Method, [], []).
  154. %% @doc Same as send_req/3.
  155. %% If a list is specified for the body it has to be a flat list. The body can also be a fun/0 or a fun/1. <br/>
  156. %% If fun/0, the connection handling process will repeatdely call the fun until it returns an error or eof. <pre>Fun() = {ok, Data} | eof</pre><br/>
  157. %% If fun/1, the connection handling process will repeatedly call the fun with the supplied state until it returns an error or eof. <pre>Fun(State) = {ok, Data} | {ok, Data, NewState} | eof</pre>
  158. %% @spec send_req(Url, Headers, Method::method(), Body::body()) -> response()
  159. %% body() = [] | string() | binary() | fun_arity_0() | {fun_arity_1(), initial_state()}
  160. %% initial_state() = term()
  161. send_req(Url, Headers, Method, Body) ->
  162. send_req(Url, Headers, Method, Body, []).
  163. %% @doc Same as send_req/4.
  164. %% For a description of SSL Options, look in the <a href="http://www.erlang.org/doc/apps/ssl/index.html">ssl</a> manpage.
  165. %% For a description of Process Options, look in the <a href="http://www.erlang.org/doc/man/gen_server.html">gen_server</a> manpage.
  166. %% If the HTTP Version to use is not specified, the default is 1.1.
  167. %% <br/>
  168. %% <ul>
  169. %% <li>The <code>host_header</code> option is useful in the case where ibrowse is
  170. %% connecting to a component such as <a
  171. %% href="http://www.stunnel.org">stunnel</a> which then sets up a
  172. %% secure connection to a webserver. In this case, the URL supplied to
  173. %% ibrowse must have the stunnel host/port details, but that won't
  174. %% make sense to the destination webserver. This option can then be
  175. %% used to specify what should go in the <code>Host</code> header in
  176. %% the request.</li>
  177. %% <li>The <code>stream_to</code> option can be used to have the HTTP
  178. %% response streamed to a process as messages as data arrives on the
  179. %% socket. If the calling process wishes to control the rate at which
  180. %% data is received from the server, the option <code>{stream_to,
  181. %% {process(), once}}</code> can be specified. The calling process
  182. %% will have to invoke <code>ibrowse:stream_next(Request_id)</code> to
  183. %% receive the next packet.</li>
  184. %%
  185. %% <li>When both the options <code>save_response_to_file</code> and <code>stream_to</code>
  186. %% are specified, the former takes precedence.</li>
  187. %%
  188. %% <li>For the <code>save_response_to_file</code> option, the response body is saved to
  189. %% file only if the status code is in the 200-299 range. If not, the response body is returned
  190. %% as a string.</li>
  191. %% <li>Whenever an error occurs in the processing of a request, ibrowse will return as much
  192. %% information as it has, such as HTTP Status Code and HTTP Headers. When this happens, the response
  193. %% is of the form <code>{error, {Reason, {stat_code, StatusCode}, HTTP_headers}}</code></li>
  194. %%
  195. %% <li>The <code>inactivity_timeout</code> option is useful when
  196. %% dealing with large response bodies and/or slow links. In these
  197. %% cases, it might be hard to estimate how long a request will take to
  198. %% complete. In such cases, the client might want to timeout if no
  199. %% data has been received on the link for a certain time interval.
  200. %%
  201. %% This value is also used to close connections which are not in use for
  202. %% the specified timeout value.
  203. %% </li>
  204. %%
  205. %% <li>
  206. %% The <code>connect_timeout</code> option is to specify how long the
  207. %% client process should wait for connection establishment. This is
  208. %% useful in scenarios where connections to servers are usually setup
  209. %% very fast, but responses might take much longer compared to
  210. %% connection setup. In such cases, it is better for the calling
  211. %% process to timeout faster if there is a problem (DNS lookup
  212. %% delays/failures, network routing issues, etc). The total timeout
  213. %% value specified for the request will enforced. To illustrate using
  214. %% an example:
  215. %% <code>
  216. %% ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000).
  217. %% </code>
  218. %% In the above invocation, if the connection isn't established within
  219. %% 100 milliseconds, the request will fail with
  220. %% <code>{error, conn_failed}</code>.<br/>
  221. %% If connection setup succeeds, the total time allowed for the
  222. %% request to complete will be 1000 milliseconds minus the time taken
  223. %% for connection setup.
  224. %% </li>
  225. %%
  226. %% <li> The <code>socket_options</code> option can be used to set
  227. %% specific options on the socket. The <code>{active, true | false | once}</code>
  228. %% and <code>{packet_type, Packet_type}</code> will be filtered out by ibrowse. </li>
  229. %%
  230. %% <li> The <code>headers_as_is</code> option is to enable the caller
  231. %% to send headers exactly as specified in the request without ibrowse
  232. %% adding some of its own. Required for some picky servers apparently. </li>
  233. %%
  234. %% <li>The <code>give_raw_headers</code> option is to enable the
  235. %% caller to get access to the raw status line and raw unparsed
  236. %% headers. Not quite sure why someone would want this, but one of my
  237. %% users asked for it, so here it is. </li>
  238. %%
  239. %% <li> The <code>preserve_chunked_encoding</code> option enables the caller
  240. %% to receive the raw data stream when the Transfer-Encoding of the server
  241. %% response is Chunked.
  242. %% </li>
  243. %% <li> The <code>return_raw_request</code> option enables the caller to get the exact request which was sent by ibrowse to the server, along with the response. When this option is used, the response for synchronous requests is a 5-tuple instead of the usual 4-tuple. For asynchronous requests, the calling process gets a message <code>{ibrowse_async_raw_req, Raw_req}</code>.
  244. %% </li>
  245. %% </ul>
  246. %%
  247. %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
  248. %% optionList() = [option()]
  249. %% option() = {max_sessions, integer()} |
  250. %% {response_format,response_format()}|
  251. %% {stream_chunk_size, integer()} |
  252. %% {max_pipeline_size, integer()} |
  253. %% {trace, boolean()} |
  254. %% {is_ssl, boolean()} |
  255. %% {ssl_options, [SSLOpt]} |
  256. %% {pool_name, atom()} |
  257. %% {proxy_host, string()} |
  258. %% {proxy_port, integer()} |
  259. %% {proxy_user, string()} |
  260. %% {proxy_password, string()} |
  261. %% {use_absolute_uri, boolean()} |
  262. %% {basic_auth, {username(), password()}} |
  263. %% {cookie, string()} |
  264. %% {content_length, integer()} |
  265. %% {content_type, string()} |
  266. %% {save_response_to_file, srtf()} |
  267. %% {stream_to, stream_to()} |
  268. %% {http_vsn, {MajorVsn, MinorVsn}} |
  269. %% {host_header, string()} |
  270. %% {inactivity_timeout, integer()} |
  271. %% {connect_timeout, integer()} |
  272. %% {socket_options, Sock_opts} |
  273. %% {transfer_encoding, {chunked, ChunkSize}} |
  274. %% {headers_as_is, boolean()} |
  275. %% {give_raw_headers, boolean()} |
  276. %% {preserve_chunked_encoding,boolean()} |
  277. %% {workaround, head_response_with_body} |
  278. %% {worker_process_options, list()} |
  279. %% {return_raw_request, true}
  280. %%
  281. %% stream_to() = process() | {process(), once}
  282. %% process() = pid() | atom()
  283. %% username() = string()
  284. %% password() = string()
  285. %% SSLOpt = term()
  286. %% Sock_opts = [Sock_opt]
  287. %% Sock_opt = term()
  288. %% ChunkSize = integer()
  289. %% srtf() = boolean() | filename() | {append, filename()}
  290. %% filename() = string()
  291. %% response_format() = list | binary
  292. send_req(Url, Headers, Method, Body, Options) ->
  293. send_req(Url, Headers, Method, Body, Options, 30000).
  294. %% @doc Same as send_req/5.
  295. %% All timeout values are in milliseconds.
  296. %% @spec send_req(Url, Headers::headerList(), Method::method(), Body::body(), Options::optionList(), Timeout) -> response()
  297. %% Timeout = integer() | infinity
  298. send_req(Url, Headers, Method, Body, Options, Timeout) ->
  299. case catch parse_url(Url) of
  300. #url{host = Host,
  301. port = Port,
  302. protocol = Protocol} = Parsed_url ->
  303. Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of
  304. [] ->
  305. get_lb_pid(Parsed_url);
  306. [#lb_pid{pid = Lb_pid_1}] ->
  307. Lb_pid_1
  308. end,
  309. Max_sessions = get_max_sessions(Host, Port, Options),
  310. Max_pipeline_size = get_max_pipeline_size(Host, Port, Options),
  311. Options_1 = merge_options(Host, Port, Options),
  312. {SSLOptions, IsSSL} =
  313. case (Protocol == https) orelse
  314. get_value(is_ssl, Options_1, false) of
  315. false -> {[], false};
  316. true -> {get_value(ssl_options, Options_1, []), true}
  317. end,
  318. try_routing_request(Lb_pid, Parsed_url,
  319. Max_sessions,
  320. Max_pipeline_size,
  321. {SSLOptions, IsSSL},
  322. Headers, Method, Body, Options_1, Timeout, 0);
  323. Err ->
  324. {error, {url_parsing_failed, Err}}
  325. end.
  326. try_routing_request(Lb_pid, Parsed_url,
  327. Max_sessions,
  328. Max_pipeline_size,
  329. {SSLOptions, IsSSL},
  330. Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
  331. ProcessOptions = get_value(worker_process_options, Options_1, []),
  332. case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
  333. Max_sessions,
  334. Max_pipeline_size,
  335. {SSLOptions, IsSSL},
  336. ProcessOptions) of
  337. {ok, Conn_Pid} ->
  338. case do_send_req(Conn_Pid, Parsed_url, Headers,
  339. Method, Body, Options_1, Timeout) of
  340. {error, sel_conn_closed} ->
  341. try_routing_request(Lb_pid, Parsed_url,
  342. Max_sessions,
  343. Max_pipeline_size,
  344. {SSLOptions, IsSSL},
  345. Headers, Method, Body, Options_1, Timeout, Try_count + 1);
  346. Res ->
  347. Res
  348. end;
  349. Err ->
  350. Err
  351. end;
  352. try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
  353. {error, retry_later}.
  354. merge_options(Host, Port, Options) ->
  355. Config_options = get_config_value({options, Host, Port}, []) ++
  356. get_config_value({options, global}, []),
  357. lists:foldl(
  358. fun({Key, Val}, Acc) ->
  359. case lists:keysearch(Key, 1, Options) of
  360. false ->
  361. [{Key, Val} | Acc];
  362. _ ->
  363. Acc
  364. end
  365. end, Options, Config_options).
  366. get_lb_pid(Url) ->
  367. gen_server:call(?MODULE, {get_lb_pid, Url}).
  368. get_max_sessions(Host, Port, Options) ->
  369. get_value(max_sessions, Options,
  370. get_config_value({max_sessions, Host, Port},
  371. default_max_sessions())).
  372. get_max_pipeline_size(Host, Port, Options) ->
  373. get_value(max_pipeline_size, Options,
  374. get_config_value({max_pipeline_size, Host, Port},
  375. default_max_pipeline_size())).
  376. default_max_sessions() ->
  377. safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
  378. default_max_pipeline_size() ->
  379. safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
  380. safe_get_env(App, Key, Def_val) ->
  381. case application:get_env(App, Key) of
  382. undefined ->
  383. Def_val;
  384. {ok, Val} ->
  385. Val
  386. end.
  387. %% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
  388. %% for achieving the same effect.
  389. set_dest(Host, Port, [{max_sessions, Max} | T]) ->
  390. set_max_sessions(Host, Port, Max),
  391. set_dest(Host, Port, T);
  392. set_dest(Host, Port, [{max_pipeline_size, Max} | T]) ->
  393. set_max_pipeline_size(Host, Port, Max),
  394. set_dest(Host, Port, T);
  395. set_dest(Host, Port, [{trace, Bool} | T]) when Bool == true; Bool == false ->
  396. ibrowse ! {trace, true, Host, Port},
  397. set_dest(Host, Port, T);
  398. set_dest(_Host, _Port, [H | _]) ->
  399. exit({invalid_option, H});
  400. set_dest(_, _, []) ->
  401. ok.
  402. %% @doc Set the maximum number of connections allowed to a specific Host:Port.
  403. %% @spec set_max_sessions(Host::string(), Port::integer(), Max::integer()) -> ok
  404. set_max_sessions(Host, Port, Max) when is_integer(Max), Max > 0 ->
  405. gen_server:call(?MODULE, {set_config_value, {max_sessions, Host, Port}, Max}).
  406. %% @doc Set the maximum pipeline size for each connection to a specific Host:Port.
  407. %% @spec set_max_pipeline_size(Host::string(), Port::integer(), Max::integer()) -> ok
  408. set_max_pipeline_size(Host, Port, Max) when is_integer(Max), Max > 0 ->
  409. gen_server:call(?MODULE, {set_config_value, {max_pipeline_size, Host, Port}, Max}).
  410. do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
  411. case catch ibrowse_http_client:send_req(Conn_Pid, Parsed_url,
  412. Headers, Method, ensure_bin(Body),
  413. Options, Timeout) of
  414. {'EXIT', {timeout, _}} ->
  415. {error, req_timedout};
  416. {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
  417. {error, sel_conn_closed};
  418. {'EXIT', {normal, _}} ->
  419. {error, sel_conn_closed};
  420. {'EXIT', {connection_closed, _}} ->
  421. {error, sel_conn_closed};
  422. {error, connection_closed} ->
  423. {error, sel_conn_closed};
  424. {'EXIT', Reason} ->
  425. {error, {'EXIT', Reason}};
  426. {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
  427. case get_value(response_format, Options, list) of
  428. list ->
  429. {ok, St_code, Headers, binary_to_list(Body)};
  430. binary ->
  431. Ret
  432. end;
  433. {ok, St_code, Headers, Body, Req} = Ret when is_binary(Body) ->
  434. case get_value(response_format, Options, list) of
  435. list ->
  436. {ok, St_code, Headers, binary_to_list(Body), Req};
  437. binary ->
  438. Ret
  439. end;
  440. Ret ->
  441. Ret
  442. end.
  443. ensure_bin(L) when is_list(L) -> list_to_binary(L);
  444. ensure_bin(B) when is_binary(B) -> B;
  445. ensure_bin(Fun) when is_function(Fun) -> Fun;
  446. ensure_bin({Fun}) when is_function(Fun) -> Fun;
  447. ensure_bin({Fun, _} = Body) when is_function(Fun) -> Body.
  448. %% @doc Creates a HTTP client process to the specified Host:Port which
  449. %% is not part of the load balancing pool. This is useful in cases
  450. %% where some requests to a webserver might take a long time whereas
  451. %% some might take a very short time. To avoid getting these quick
  452. %% requests stuck in the pipeline behind time consuming requests, use
  453. %% this function to get a handle to a connection process. <br/>
  454. %% <b>Note:</b> Calling this function only creates a worker process. No connection
  455. %% is setup. The connection attempt is made only when the first
  456. %% request is sent via any of the send_req_direct/4,5,6,7 functions.<br/>
  457. %% <b>Note:</b> It is the responsibility of the calling process to control
  458. %% pipeline size on such connections.
  459. %% @spec spawn_worker_process(Url::string() | {Host::string(), Port::integer()}) -> {ok, pid()}
  460. spawn_worker_process(Args) ->
  461. spawn_worker_process(Args, []).
  462. %% @doc Same as spawn_worker_process/1 except with Erlang process options.
  463. %% @spec spawn_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
  464. spawn_worker_process(Host, Port) when is_list(Host), is_integer(Port) ->
  465. %% Convert old API calls to new API format.
  466. spawn_worker_process({Host, Port}, []);
  467. spawn_worker_process(Args, Options) ->
  468. ibrowse_http_client:start(Args, Options).
  469. %% @doc Same as spawn_worker_process/1 except the the calling process
  470. %% is linked to the worker process which is spawned.
  471. %% @spec spawn_link_worker_process(Url::string() | {Host::string(), Port::integer()}) -> {ok, pid()}
  472. spawn_link_worker_process(Args) ->
  473. spawn_link_worker_process(Args, []).
  474. %% @doc Same as spawn_link_worker_process/1 except with Erlang process options.
  475. %% @spec spawn_link_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
  476. spawn_link_worker_process(Host, Port) when is_list(Host), is_integer(Port) ->
  477. %% Convert old API calls to new API format.
  478. spawn_link_worker_process({Host, Port}, []);
  479. spawn_link_worker_process(Args, Options) ->
  480. ibrowse_http_client:start_link(Args, Options).
  481. %% @doc Terminate a worker process spawned using
  482. %% spawn_worker_process/2 or spawn_link_worker_process/2. Requests in
  483. %% progress will get the error response <pre>{error, closing_on_request}</pre>
  484. %% @spec stop_worker_process(Conn_pid::pid()) -> ok
  485. stop_worker_process(Conn_pid) ->
  486. ibrowse_http_client:stop(Conn_pid).
  487. %% @doc Same as send_req/3 except that the first argument is the PID
  488. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  489. send_req_direct(Conn_pid, Url, Headers, Method) ->
  490. send_req_direct(Conn_pid, Url, Headers, Method, [], []).
  491. %% @doc Same as send_req/4 except that the first argument is the PID
  492. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  493. send_req_direct(Conn_pid, Url, Headers, Method, Body) ->
  494. send_req_direct(Conn_pid, Url, Headers, Method, Body, []).
  495. %% @doc Same as send_req/5 except that the first argument is the PID
  496. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  497. send_req_direct(Conn_pid, Url, Headers, Method, Body, Options) ->
  498. send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, 30000).
  499. %% @doc Same as send_req/6 except that the first argument is the PID
  500. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  501. send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) ->
  502. case catch parse_url(Url) of
  503. #url{host = Host,
  504. port = Port} = Parsed_url ->
  505. Options_1 = merge_options(Host, Port, Options),
  506. case do_send_req(Conn_pid, Parsed_url, Headers, Method, Body, Options_1, Timeout) of
  507. {error, {'EXIT', {noproc, _}}} ->
  508. {error, worker_is_dead};
  509. Ret ->
  510. Ret
  511. end;
  512. Err ->
  513. {error, {url_parsing_failed, Err}}
  514. end.
  515. %% @doc Tell ibrowse to stream the next chunk of data to the
  516. %% caller. Should be used in conjunction with the
  517. %% <code>stream_to</code> option
  518. %% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
  519. stream_next(Req_id) ->
  520. case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
  521. [] ->
  522. {error, unknown_req_id};
  523. [{_, Pid}] ->
  524. catch Pid ! {stream_next, Req_id},
  525. ok
  526. end.
  527. %% @doc Tell ibrowse to close the connection associated with the
  528. %% specified stream. Should be used in conjunction with the
  529. %% <code>stream_to</code> option. Note that all requests in progress on
  530. %% the connection which is serving this Req_id will be aborted, and an
  531. %% error returned.
  532. %% @spec stream_close(Req_id :: req_id()) -> ok | {error, unknown_req_id}
  533. stream_close(Req_id) ->
  534. case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
  535. [] ->
  536. {error, unknown_req_id};
  537. [{_, Pid}] ->
  538. catch Pid ! {stream_close, Req_id},
  539. ok
  540. end.
  541. %% @doc Turn tracing on for the ibrowse process
  542. trace_on() ->
  543. ibrowse ! {trace, true}.
  544. %% @doc Turn tracing off for the ibrowse process
  545. trace_off() ->
  546. ibrowse ! {trace, false}.
  547. %% @doc Turn tracing on for all connections to the specified HTTP
  548. %% server. Host is whatever is specified as the domain name in the URL
  549. %% @spec trace_on(Host, Port) -> ok
  550. %% Host = string()
  551. %% Port = integer()
  552. trace_on(Host, Port) ->
  553. ibrowse ! {trace, true, Host, Port},
  554. ok.
  555. %% @doc Turn tracing OFF for all connections to the specified HTTP
  556. %% server.
  557. %% @spec trace_off(Host, Port) -> ok
  558. trace_off(Host, Port) ->
  559. ibrowse ! {trace, false, Host, Port},
  560. ok.
  561. %% @doc Turn Off ALL tracing
  562. %% @spec all_trace_off() -> ok
  563. all_trace_off() ->
  564. ibrowse ! all_trace_off,
  565. ok.
  566. %% @doc Shows some internal information about load balancing. Info
  567. %% about workers spawned using spawn_worker_process/2 or
  568. %% spawn_link_worker_process/2 is not included.
  569. show_dest_status() ->
  570. io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
  571. ["Server:port", "ETS", "Num conns", "LB Pid"]),
  572. io:format("~80.80.=s~n", [""]),
  573. Metrics = get_metrics(),
  574. lists:foreach(
  575. fun({Host, Port, Lb_pid, Tid, Size}) ->
  576. io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
  577. [Host ++ ":" ++ integer_to_list(Port),
  578. integer_to_list(Tid),
  579. integer_to_list(Size),
  580. Lb_pid])
  581. end, Metrics).
  582. show_dest_status(Url) ->
  583. #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
  584. show_dest_status(Host, Port).
  585. %% @doc Shows some internal information about load balancing to a
  586. %% specified Host:Port. Info about workers spawned using
  587. %% spawn_worker_process/2 or spawn_link_worker_process/2 is not
  588. %% included.
  589. show_dest_status(Host, Port) ->
  590. case get_metrics(Host, Port) of
  591. {Lb_pid, MsgQueueSize, Tid, Size,
  592. {{First_p_sz, First_speculative_sz},
  593. {Last_p_sz, Last_speculative_sz}}} ->
  594. io:format("Load Balancer Pid : ~p~n"
  595. "LB process msg q size : ~p~n"
  596. "LB ETS table id : ~p~n"
  597. "Num Connections : ~p~n"
  598. "Smallest pipeline : ~p:~p~n"
  599. "Largest pipeline : ~p:~p~n",
  600. [Lb_pid, MsgQueueSize, Tid, Size,
  601. First_p_sz, First_speculative_sz,
  602. Last_p_sz, Last_speculative_sz]);
  603. _Err ->
  604. io:format("Metrics not available~n", [])
  605. end.
  606. get_metrics() ->
  607. Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
  608. is_integer(Port) ->
  609. true;
  610. (_) ->
  611. false
  612. end, ets:tab2list(ibrowse_lb)),
  613. All_ets = ets:all(),
  614. lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) ->
  615. case lists:dropwhile(
  616. fun(Tid) ->
  617. ets:info(Tid, owner) /= Lb_pid
  618. end, All_ets) of
  619. [] ->
  620. {Host, Port, Lb_pid, unknown, 0};
  621. [Tid | _] ->
  622. Size = case catch (ets:info(Tid, size)) of
  623. N when is_integer(N) -> N;
  624. _ -> 0
  625. end,
  626. {Host, Port, Lb_pid, Tid, Size}
  627. end
  628. end, Dests).
  629. get_metrics(Host, Port) ->
  630. case ets:lookup(ibrowse_lb, {Host, Port}) of
  631. [] ->
  632. no_active_processes;
  633. [#lb_pid{pid = Lb_pid}] ->
  634. MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
  635. %% {Lb_pid, MsgQueueSize,
  636. case lists:dropwhile(
  637. fun(Tid) ->
  638. ets:info(Tid, owner) /= Lb_pid
  639. end, ets:all()) of
  640. [] ->
  641. {Lb_pid, MsgQueueSize, unknown, 0, unknown};
  642. [Tid | _] ->
  643. try
  644. Size = ets:info(Tid, size),
  645. case Size of
  646. 0 ->
  647. ok;
  648. _ ->
  649. First = ets:first(Tid),
  650. Last = ets:last(Tid),
  651. [{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First),
  652. [{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last),
  653. {Lb_pid, MsgQueueSize, Tid, Size,
  654. {{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}}
  655. end
  656. catch _:_ ->
  657. not_available
  658. end
  659. end
  660. end.
  661. %% @doc Clear current configuration for ibrowse and load from the file
  662. %% ibrowse.conf in the IBROWSE_EBIN/../priv directory. Current
  663. %% configuration is cleared only if the ibrowse.conf file is readable
  664. %% using file:consult/1
  665. rescan_config() ->
  666. gen_server:call(?MODULE, rescan_config).
  667. %% Clear current configuration for ibrowse and load from the specified
  668. %% file. Current configuration is cleared only if the specified
  669. %% file is readable using file:consult/1
  670. rescan_config([{_,_}|_]=Terms) ->
  671. gen_server:call(?MODULE, {rescan_config_terms, Terms});
  672. rescan_config(File) when is_list(File) ->
  673. gen_server:call(?MODULE, {rescan_config, File}).
  674. %% @doc Add additional configuration elements at runtime.
  675. add_config([{_,_}|_]=Terms) ->
  676. gen_server:call(?MODULE, {add_config_terms, Terms}).
  677. %%====================================================================
  678. %% Server functions
  679. %%====================================================================
  680. %%--------------------------------------------------------------------
  681. %% Function: init/1
  682. %% Description: Initiates the server
  683. %% Returns: {ok, State} |
  684. %% {ok, State, Timeout} |
  685. %% ignore |
  686. %% {stop, Reason}
  687. %%--------------------------------------------------------------------
  688. init(_) ->
  689. process_flag(trap_exit, true),
  690. State = #state{},
  691. put(my_trace_flag, State#state.trace),
  692. put(ibrowse_trace_token, "ibrowse"),
  693. ibrowse_lb = ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
  694. ibrowse_conf = ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
  695. ibrowse_stream = ets:new(ibrowse_stream, [named_table, public]),
  696. import_config(),
  697. {ok, #state{}}.
  698. import_config() ->
  699. case code:priv_dir(ibrowse) of
  700. {error, _} ->
  701. ok;
  702. PrivDir ->
  703. Filename = filename:join(PrivDir, "ibrowse.conf"),
  704. import_config(Filename)
  705. end.
  706. import_config(Filename) ->
  707. case file:consult(Filename) of
  708. {ok, Terms} ->
  709. apply_config(Terms);
  710. _Err ->
  711. ok
  712. end.
  713. apply_config(Terms) ->
  714. ets:delete_all_objects(ibrowse_conf),
  715. insert_config(Terms).
  716. insert_config(Terms) ->
  717. Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
  718. when is_list(Host), is_integer(Port),
  719. is_integer(MaxSess), MaxSess > 0,
  720. is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
  721. I = [{{max_sessions, Host, Port}, MaxSess},
  722. {{max_pipeline_size, Host, Port}, MaxPipe},
  723. {{options, Host, Port}, Options}],
  724. lists:foreach(
  725. fun({X, Y}) ->
  726. ets:insert(ibrowse_conf,
  727. #ibrowse_conf{key = X,
  728. value = Y})
  729. end, I);
  730. ({K, V}) ->
  731. ets:insert(ibrowse_conf,
  732. #ibrowse_conf{key = K,
  733. value = V});
  734. (X) ->
  735. io:format("Skipping unrecognised term: ~p~n", [X])
  736. end,
  737. lists:foreach(Fun, Terms).
  738. %% @doc Internal export
  739. get_config_value(Key) ->
  740. try
  741. [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
  742. V
  743. catch
  744. error:badarg ->
  745. throw({error, ibrowse_not_running})
  746. end.
  747. %% @doc Internal export
  748. get_config_value(Key, DefVal) ->
  749. try
  750. case ets:lookup(ibrowse_conf, Key) of
  751. [] ->
  752. DefVal;
  753. [#ibrowse_conf{value = V}] ->
  754. V
  755. end
  756. catch
  757. error:badarg ->
  758. throw({error, ibrowse_not_running})
  759. end.
  760. set_config_value(Key, Val) ->
  761. ets:insert(ibrowse_conf, #ibrowse_conf{key = Key, value = Val}).
  762. %%--------------------------------------------------------------------
  763. %% Function: handle_call/3
  764. %% Description: Handling call messages
  765. %% Returns: {reply, Reply, State} |
  766. %% {reply, Reply, State, Timeout} |
  767. %% {noreply, State} |
  768. %% {noreply, State, Timeout} |
  769. %% {stop, Reason, Reply, State} | (terminate/2 is called)
  770. %% {stop, Reason, State} (terminate/2 is called)
  771. %%--------------------------------------------------------------------
  772. handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
  773. Pid = do_get_connection(Url, ets:lookup(ibrowse_lb, {Host, Port})),
  774. {reply, Pid, State};
  775. handle_call(stop, _From, State) ->
  776. do_trace("IBROWSE shutting down~n", []),
  777. ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
  778. ibrowse_lb:stop(Pid),
  779. Acc
  780. end, [], ibrowse_lb),
  781. {stop, normal, ok, State};
  782. handle_call({set_config_value, Key, Val}, _From, State) ->
  783. set_config_value(Key, Val),
  784. {reply, ok, State};
  785. handle_call(rescan_config, _From, State) ->
  786. Ret = (catch import_config()),
  787. {reply, Ret, State};
  788. handle_call({rescan_config, File}, _From, State) ->
  789. Ret = (catch import_config(File)),
  790. {reply, Ret, State};
  791. handle_call({rescan_config_terms, Terms}, _From, State) ->
  792. Ret = (catch apply_config(Terms)),
  793. {reply, Ret, State};
  794. handle_call({add_config_terms, Terms}, _From, State) ->
  795. Ret = (catch insert_config(Terms)),
  796. {reply, Ret, State};
  797. handle_call(Request, _From, State) ->
  798. Reply = {unknown_request, Request},
  799. {reply, Reply, State}.
  800. %%--------------------------------------------------------------------
  801. %% Function: handle_cast/2
  802. %% Description: Handling cast messages
  803. %% Returns: {noreply, State} |
  804. %% {noreply, State, Timeout} |
  805. %% {stop, Reason, State} (terminate/2 is called)
  806. %%--------------------------------------------------------------------
  807. handle_cast(_Msg, State) ->
  808. {noreply, State}.
  809. %%--------------------------------------------------------------------
  810. %% Function: handle_info/2
  811. %% Description: Handling all non call/cast messages
  812. %% Returns: {noreply, State} |
  813. %% {noreply, State, Timeout} |
  814. %% {stop, Reason, State} (terminate/2 is called)
  815. %%--------------------------------------------------------------------
  816. handle_info(all_trace_off, State) ->
  817. Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}],
  818. Trace_on_dests = ets:select(ibrowse_conf, Mspec),
  819. Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) ->
  820. case lists:member({H, P}, Trace_on_dests) of
  821. false ->
  822. ok;
  823. true ->
  824. catch Pid ! {trace, false}
  825. end;
  826. (_, Acc) ->
  827. Acc
  828. end,
  829. ets:foldl(Fun, undefined, ibrowse_lb),
  830. ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]),
  831. {noreply, State};
  832. handle_info({trace, Bool}, State) ->
  833. put(my_trace_flag, Bool),
  834. {noreply, State};
  835. handle_info({trace, Bool, Host, Port}, State) ->
  836. Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _)
  837. when H == Host,
  838. P == Port ->
  839. catch Pid ! {trace, Bool};
  840. (_, Acc) ->
  841. Acc
  842. end,
  843. ets:foldl(Fun, undefined, ibrowse_lb),
  844. ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port},
  845. value = Bool}),
  846. {noreply, State};
  847. handle_info(_Info, State) ->
  848. {noreply, State}.
  849. %%--------------------------------------------------------------------
  850. %% Function: terminate/2
  851. %% Description: Shutdown the server
  852. %% Returns: any (ignored by gen_server)
  853. %%--------------------------------------------------------------------
  854. terminate(_Reason, _State) ->
  855. ok.
  856. %%--------------------------------------------------------------------
  857. %% Func: code_change/3
  858. %% Purpose: Convert process state when code is changed
  859. %% Returns: {ok, NewState}
  860. %%--------------------------------------------------------------------
  861. code_change(_OldVsn, State, _Extra) ->
  862. {ok, State}.
  863. %%--------------------------------------------------------------------
  864. %%% Internal functions
  865. %%--------------------------------------------------------------------
  866. do_get_connection(#url{host = Host, port = Port}, []) ->
  867. {ok, Pid} = ibrowse_lb:start_link([Host, Port]),
  868. ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = Pid}),
  869. Pid;
  870. do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
  871. Pid.