You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1031 lines
42 KiB

11 years ago
14 years ago
14 years ago
13 years ago
13 years ago
14 years ago
11 years ago
9 years ago
9 years ago
14 years ago
14 years ago
14 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
10 years ago
13 years ago
13 years ago
13 years ago
  1. %%%-------------------------------------------------------------------
  2. %%% File : ibrowse.erl
  3. %%% Author : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  4. %%% Description : Load balancer process for HTTP client connections.
  5. %%%
  6. %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  7. %%%-------------------------------------------------------------------
  8. %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
  9. %% @copyright 2005-2014 Chandrashekhar Mullaparthi
  10. %% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This
  11. %% module implements the API of the HTTP client. There is one named
  12. %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
  13. %% one process to handle one TCP connection to a webserver
  14. %% (implemented in the module ibrowse_http_client). Multiple connections to a
  15. %% webserver are setup based on the settings for each webserver. The
  16. %% ibrowse process also determines which connection to pipeline a
  17. %% certain request on. The functions to call are send_req/3,
  18. %% send_req/4, send_req/5, send_req/6.
  19. %%
  20. %% <p>Here are a few sample invocations.</p>
  21. %%
  22. %% <code>
  23. %% ibrowse:send_req("http://intranet/messenger/", [], get).
  24. %% <br/><br/>
  25. %%
  26. %% ibrowse:send_req("http://www.google.com/", [], get, [],
  27. %% [{proxy_user, "XXXXX"},
  28. %% {proxy_password, "XXXXX"},
  29. %% {proxy_host, "proxy"},
  30. %% {proxy_port, 8080}], 1000).
  31. %% <br/><br/>
  32. %%
  33. %%ibrowse:send_req("http://www.erlang.org/download/otp_src_R10B-3.tar.gz", [], get, [],
  34. %% [{proxy_user, "XXXXX"},
  35. %% {proxy_password, "XXXXX"},
  36. %% {proxy_host, "proxy"},
  37. %% {proxy_port, 8080},
  38. %% {save_response_to_file, true}], 1000).
  39. %% <br/><br/>
  40. %%
  41. %% ibrowse:send_req("http://www.erlang.org", [], head).
  42. %%
  43. %% <br/><br/>
  44. %% ibrowse:send_req("http://www.sun.com", [], options).
  45. %%
  46. %% <br/><br/>
  47. %% ibrowse:send_req("http://www.bbc.co.uk", [], trace).
  48. %%
  49. %% <br/><br/>
  50. %% ibrowse:send_req("http://www.google.com", [], get, [],
  51. %% [{stream_to, self()}]).
  52. %% </code>
  53. %%
  54. -module(ibrowse).
  55. -behaviour(gen_server).
  56. %%--------------------------------------------------------------------
  57. %% Include files
  58. %%--------------------------------------------------------------------
  59. %%--------------------------------------------------------------------
  60. %% External exports
  61. -export([start_link/0, start/0, stop/0]).
  62. %% gen_server callbacks
  63. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  64. terminate/2, code_change/3]).
  65. %% API interface
  66. -export([
  67. rescan_config/0,
  68. rescan_config/1,
  69. add_config/1,
  70. get_config_value/1,
  71. get_config_value/2,
  72. spawn_worker_process/1,
  73. spawn_worker_process/2,
  74. spawn_link_worker_process/1,
  75. spawn_link_worker_process/2,
  76. stop_worker_process/1,
  77. send_req/3,
  78. send_req/4,
  79. send_req/5,
  80. send_req/6,
  81. send_req_direct/4,
  82. send_req_direct/5,
  83. send_req_direct/6,
  84. send_req_direct/7,
  85. stream_next/1,
  86. stream_close/1,
  87. set_max_sessions/3,
  88. set_max_pipeline_size/3,
  89. set_max_attempts/3,
  90. set_download_dir/1,
  91. set_dest/3,
  92. trace_on/0,
  93. trace_off/0,
  94. trace_on/2,
  95. trace_off/2,
  96. all_trace_off/0,
  97. show_dest_status/0,
  98. show_dest_status/1,
  99. show_dest_status/2,
  100. get_metrics/0,
  101. get_metrics/2
  102. ]).
  103. -ifdef(debug).
  104. -compile(export_all).
  105. -endif.
  106. -import(ibrowse_lib, [
  107. parse_url/1,
  108. get_value/3,
  109. do_trace/2,
  110. log_msg/2
  111. ]).
  112. -record(state, {trace = false}).
  113. -include("ibrowse.hrl").
  114. -include_lib("stdlib/include/ms_transform.hrl").
  115. -define(DEF_MAX_SESSIONS,10).
  116. -define(DEF_MAX_PIPELINE_SIZE,10).
  117. -define(DEF_MAX_ATTEMPTS,3).
  118. %%====================================================================
  119. %% External functions
  120. %%====================================================================
  121. %%--------------------------------------------------------------------
  122. %% Function: start_link/0
  123. %% Description: Starts the server
  124. %%--------------------------------------------------------------------
  125. %% @doc Starts the ibrowse process linked to the calling process. Usually invoked by the supervisor ibrowse_sup
  126. %% @spec start_link() -> {ok, pid()}
  127. start_link() ->
  128. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  129. %% @doc Starts the ibrowse process without linking. Useful when testing using the shell
  130. start() ->
  131. gen_server:start({local, ?MODULE}, ?MODULE, [], [{debug, []}]).
  132. %% @doc Stop the ibrowse process. Useful when testing using the shell.
  133. stop() ->
  134. case catch gen_server:call(ibrowse, stop) of
  135. {'EXIT',{noproc,_}} ->
  136. ok;
  137. Res ->
  138. Res
  139. end.
  140. %% @doc This is the basic function to send a HTTP request.
  141. %% The Status return value indicates the HTTP status code returned by the webserver
  142. %% @spec send_req(Url::string(), Headers::headerList(), Method::method()) -> response()
  143. %% headerList() = [{header(), value()}]
  144. %% header() = atom() | string() | binary()
  145. %% value() = term()
  146. %% method() = get | post | head | options | put | delete | trace | mkcol | propfind | proppatch | lock | unlock | move | copy
  147. %% Status = string()
  148. %% ResponseHeaders = [respHeader()]
  149. %% respHeader() = {headerName(), headerValue()}
  150. %% headerName() = string()
  151. %% headerValue() = string()
  152. %% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
  153. %% req_id() = term()
  154. %% ResponseBody = string() | {file, Filename}
  155. %% Reason = term()
  156. send_req(Url, Headers, Method) ->
  157. send_req(Url, Headers, Method, [], []).
  158. %% @doc Same as send_req/3.
  159. %% If a list is specified for the body it has to be a flat list. The body can also be a fun/0 or a fun/1. <br/>
  160. %% If fun/0, the connection handling process will repeatdely call the fun until it returns an error or eof. <pre>Fun() = {ok, Data} | eof</pre><br/>
  161. %% If fun/1, the connection handling process will repeatedly call the fun with the supplied state until it returns an error or eof. <pre>Fun(State) = {ok, Data} | {ok, Data, NewState} | eof</pre>
  162. %% @spec send_req(Url, Headers, Method::method(), Body::body()) -> response()
  163. %% body() = [] | string() | binary() | fun_arity_0() | {fun_arity_1(), initial_state()}
  164. %% initial_state() = term()
  165. send_req(Url, Headers, Method, Body) ->
  166. send_req(Url, Headers, Method, Body, []).
  167. %% @doc Same as send_req/4.
  168. %% For a description of SSL Options, look in the <a href="http://www.erlang.org/doc/apps/ssl/index.html">ssl</a> manpage.
  169. %% For a description of Process Options, look in the <a href="http://www.erlang.org/doc/man/gen_server.html">gen_server</a> manpage.
  170. %% If the HTTP Version to use is not specified, the default is 1.1.
  171. %% <br/>
  172. %% <ul>
  173. %% <li>The <code>host_header</code> option is useful in the case where ibrowse is
  174. %% connecting to a component such as <a
  175. %% href="http://www.stunnel.org">stunnel</a> which then sets up a
  176. %% secure connection to a webserver. In this case, the URL supplied to
  177. %% ibrowse must have the stunnel host/port details, but that won't
  178. %% make sense to the destination webserver. This option can then be
  179. %% used to specify what should go in the <code>Host</code> header in
  180. %% the request.</li>
  181. %% <li>The <code>stream_to</code> option can be used to have the HTTP
  182. %% response streamed to a process as messages as data arrives on the
  183. %% socket. If the calling process wishes to control the rate at which
  184. %% data is received from the server, the option <code>{stream_to,
  185. %% {process(), once}}</code> can be specified. The calling process
  186. %% will have to invoke <code>ibrowse:stream_next(Request_id)</code> to
  187. %% receive the next packet.</li>
  188. %%
  189. %% <li>When both the options <code>save_response_to_file</code> and <code>stream_to</code>
  190. %% are specified, the former takes precedence.</li>
  191. %%
  192. %% <li>For the <code>save_response_to_file</code> option, the response body is saved to
  193. %% file only if the status code is in the 200-299 range. If not, the response body is returned
  194. %% as a string.</li>
  195. %% <li>Whenever an error occurs in the processing of a request, ibrowse will return as much
  196. %% information as it has, such as HTTP Status Code and HTTP Headers. When this happens, the response
  197. %% is of the form <code>{error, {Reason, {stat_code, StatusCode}, HTTP_headers}}</code></li>
  198. %%
  199. %% <li>The <code>inactivity_timeout</code> option is useful when
  200. %% dealing with large response bodies and/or slow links. In these
  201. %% cases, it might be hard to estimate how long a request will take to
  202. %% complete. In such cases, the client might want to timeout if no
  203. %% data has been received on the link for a certain time interval.
  204. %%
  205. %% This value is also used to close connections which are not in use for
  206. %% the specified timeout value.
  207. %% </li>
  208. %%
  209. %% <li>
  210. %% The <code>connect_timeout</code> option is to specify how long the
  211. %% client process should wait for connection establishment. This is
  212. %% useful in scenarios where connections to servers are usually setup
  213. %% very fast, but responses might take much longer compared to
  214. %% connection setup. In such cases, it is better for the calling
  215. %% process to timeout faster if there is a problem (DNS lookup
  216. %% delays/failures, network routing issues, etc). The total timeout
  217. %% value specified for the request will enforced. To illustrate using
  218. %% an example:
  219. %% <code>
  220. %% ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000).
  221. %% </code>
  222. %% In the above invocation, if the connection isn't established within
  223. %% 100 milliseconds, the request will fail with
  224. %% <code>{error, conn_failed}</code>.<br/>
  225. %% If connection setup succeeds, the total time allowed for the
  226. %% request to complete will be 1000 milliseconds minus the time taken
  227. %% for connection setup.
  228. %% </li>
  229. %%
  230. %% <li> The <code>socket_options</code> option can be used to set
  231. %% specific options on the socket. The <code>{active, true | false | once}</code>
  232. %% and <code>{packet_type, Packet_type}</code> will be filtered out by ibrowse. </li>
  233. %%
  234. %% <li> The <code>headers_as_is</code> option is to enable the caller
  235. %% to send headers exactly as specified in the request without ibrowse
  236. %% adding some of its own. Required for some picky servers apparently. </li>
  237. %%
  238. %% <li>The <code>give_raw_headers</code> option is to enable the
  239. %% caller to get access to the raw status line and raw unparsed
  240. %% headers. Not quite sure why someone would want this, but one of my
  241. %% users asked for it, so here it is. </li>
  242. %%
  243. %% <li> The <code>preserve_status_line</code> option is to get the raw status line as a custom header
  244. %% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary}
  245. %% If both the <code>give_raw_headers</code> and <code>preserve_status_line</code> are specified
  246. %% in a request, only the <code>give_raw_headers</code> is honoured. </li>
  247. %%
  248. %% <li> The <code>preserve_chunked_encoding</code> option enables the caller
  249. %% to receive the raw data stream when the Transfer-Encoding of the server
  250. %% response is Chunked.
  251. %% </li>
  252. %% <li> The <code>return_raw_request</code> option enables the caller to get the exact request which was sent by ibrowse to the server, along with the response. When this option is used, the response for synchronous requests is a 5-tuple instead of the usual 4-tuple. For asynchronous requests, the calling process gets a message <code>{ibrowse_async_raw_req, Raw_req}</code>.
  253. %% </li>
  254. %% </ul>
  255. %%
  256. %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
  257. %% optionList() = [option()]
  258. %% option() = {max_sessions, integer()} |
  259. %% {response_format,response_format()}|
  260. %% {stream_full_chunks, boolean()} |
  261. %% {stream_chunk_size, integer()} |
  262. %% {max_pipeline_size, integer()} |
  263. %% {trace, boolean()} |
  264. %% {is_ssl, boolean()} |
  265. %% {ssl_options, [SSLOpt]} |
  266. %% {pool_name, atom()} |
  267. %% {proxy_host, string()} |
  268. %% {proxy_port, integer()} |
  269. %% {proxy_user, string()} |
  270. %% {proxy_password, string()} |
  271. %% {use_absolute_uri, boolean()} |
  272. %% {basic_auth, {username(), password()}} |
  273. %% {cookie, string()} |
  274. %% {content_length, integer()} |
  275. %% {content_type, string()} |
  276. %% {save_response_to_file, srtf()} |
  277. %% {stream_to, stream_to()} |
  278. %% {http_vsn, {MajorVsn, MinorVsn}} |
  279. %% {host_header, string()} |
  280. %% {inactivity_timeout, integer()} |
  281. %% {connect_timeout, integer()} |
  282. %% {socket_options, Sock_opts} |
  283. %% {transfer_encoding, {chunked, ChunkSize}} |
  284. %% {headers_as_is, boolean()} |
  285. %% {give_raw_headers, boolean()} |
  286. %% {preserve_chunked_encoding,boolean()} |
  287. %% {workaround, head_response_with_body} |
  288. %% {worker_process_options, list()} |
  289. %% {return_raw_request, true} |
  290. %% {max_attempts, integer()} |
  291. %% {socks5_host, host()} |
  292. %% {socks5_port, integer()} |
  293. %% {socks5_user, binary()} |
  294. %% {socks5_password, binary()}
  295. %%
  296. %% ip4_address() = {0..255, 0..255, 0..255, 0..255}
  297. %% ip6_address() =
  298. %% {0..65535,
  299. %% 0..65535,
  300. %% 0..65535,
  301. %% 0..65535,
  302. %% 0..65535,
  303. %% 0..65535,
  304. %% 0..65535,
  305. %% 0..65535}
  306. %% host() = string() | ip4_address() | ip6_address()
  307. %% stream_to() = process() | {process(), once}
  308. %% process() = pid() | atom()
  309. %% username() = string()
  310. %% password() = string()
  311. %% SSLOpt = term()
  312. %% Sock_opts = [Sock_opt]
  313. %% Sock_opt = term()
  314. %% ChunkSize = integer()
  315. %% srtf() = boolean() | filename() | {append, filename()}
  316. %% filename() = string()
  317. %% response_format() = list | binary
  318. send_req(Url, Headers, Method, Body, Options) ->
  319. send_req(Url, Headers, Method, Body, Options, 30000).
  320. %% @doc Same as send_req/5.
  321. %% All timeout values are in milliseconds.
  322. %% @spec send_req(Url, Headers::headerList(), Method::method(), Body::body(), Options::optionList(), Timeout) -> response()
  323. %% Timeout = integer() | infinity
  324. send_req(Url, Headers, Method, Body, Options, Timeout) ->
  325. case catch parse_url(Url) of
  326. #url{host = Host,
  327. port = Port,
  328. protocol = Protocol} = Parsed_url ->
  329. Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of
  330. [] ->
  331. get_lb_pid(Parsed_url);
  332. [#lb_pid{pid = Lb_pid_1}] ->
  333. Lb_pid_1
  334. end,
  335. Max_sessions = get_max_sessions(Host, Port, Options),
  336. Max_pipeline_size = get_max_pipeline_size(Host, Port, Options),
  337. Max_attempts = get_max_attempts(Host, Port, Options),
  338. Options_1 = merge_options(Host, Port, Options),
  339. {SSLOptions, IsSSL} =
  340. case (Protocol == https) orelse
  341. get_value(is_ssl, Options_1, false) of
  342. false -> {[], false};
  343. true -> {get_value(ssl_options, Options_1, []), true}
  344. end,
  345. try_routing_request(Lb_pid, Parsed_url,
  346. Max_sessions,
  347. Max_pipeline_size,
  348. {SSLOptions, IsSSL},
  349. Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), Max_attempts, 0);
  350. Err ->
  351. {error, {url_parsing_failed, Err}}
  352. end.
  353. try_routing_request(Lb_pid, Parsed_url,
  354. Max_sessions,
  355. Max_pipeline_size,
  356. {SSLOptions, IsSSL},
  357. Headers, Method, Body, Options_1, Timeout,
  358. Ori_timeout, Req_start_time, Max_attempts, Try_count) when Try_count < Max_attempts ->
  359. ProcessOptions = get_value(worker_process_options, Options_1, []),
  360. case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
  361. Max_sessions,
  362. Max_pipeline_size,
  363. {SSLOptions, IsSSL},
  364. ProcessOptions) of
  365. {ok, {_Pid_cur_spec_size, _, Conn_Pid}} ->
  366. case do_send_req(Conn_Pid, Parsed_url, Headers,
  367. Method, Body, Options_1, Timeout) of
  368. {error, sel_conn_closed} ->
  369. Time_now = os:timestamp(),
  370. Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)),
  371. Time_remaining = Ori_timeout - Time_taken_so_far,
  372. Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)),
  373. %% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]),
  374. case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of
  375. true ->
  376. try_routing_request(Lb_pid, Parsed_url,
  377. Max_sessions,
  378. Max_pipeline_size,
  379. {SSLOptions, IsSSL},
  380. Headers, Method, Body, Options_1,
  381. Time_remaining, Ori_timeout, Req_start_time, Max_attempts, Try_count + 1);
  382. false ->
  383. {error, retry_later}
  384. end;
  385. Res ->
  386. Res
  387. end;
  388. Err ->
  389. Err
  390. end;
  391. try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _, _) ->
  392. {error, retry_later}.
  393. merge_options(Host, Port, Options) ->
  394. Config_options = get_config_value({options, Host, Port}, []) ++
  395. get_config_value({options, global}, []),
  396. lists:foldl(
  397. fun({Key, Val}, Acc) ->
  398. case lists:keysearch(Key, 1, Options) of
  399. false ->
  400. [{Key, Val} | Acc];
  401. _ ->
  402. Acc
  403. end
  404. end, Options, Config_options).
  405. get_lb_pid(Url) ->
  406. gen_server:call(?MODULE, {get_lb_pid, Url}).
  407. get_max_sessions(Host, Port, Options) ->
  408. get_value(max_sessions, Options,
  409. get_config_value({max_sessions, Host, Port},
  410. default_max_sessions())).
  411. get_max_pipeline_size(Host, Port, Options) ->
  412. get_value(max_pipeline_size, Options,
  413. get_config_value({max_pipeline_size, Host, Port},
  414. default_max_pipeline_size())).
  415. get_max_attempts(Host, Port, Options) ->
  416. get_value(max_attempts, Options,
  417. get_config_value({max_attempts, Host, Port},
  418. default_max_attempts())).
  419. default_max_sessions() ->
  420. safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
  421. default_max_pipeline_size() ->
  422. safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
  423. default_max_attempts() ->
  424. safe_get_env(ibrowse, default_max_attempts, ?DEF_MAX_ATTEMPTS).
  425. safe_get_env(App, Key, Def_val) ->
  426. case application:get_env(App, Key) of
  427. undefined ->
  428. Def_val;
  429. {ok, Val} ->
  430. Val
  431. end.
  432. %% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
  433. %% for achieving the same effect.
  434. set_dest(Host, Port, [{max_sessions, Max} | T]) ->
  435. set_max_sessions(Host, Port, Max),
  436. set_dest(Host, Port, T);
  437. set_dest(Host, Port, [{max_pipeline_size, Max} | T]) ->
  438. set_max_pipeline_size(Host, Port, Max),
  439. set_dest(Host, Port, T);
  440. set_dest(Host, Port, [{trace, Bool} | T]) when Bool == true; Bool == false ->
  441. ibrowse ! {trace, true, Host, Port},
  442. set_dest(Host, Port, T);
  443. set_dest(_Host, _Port, [H | _]) ->
  444. exit({invalid_option, H});
  445. set_dest(_, _, []) ->
  446. ok.
  447. %% @doc Set the maximum number of connections allowed to a specific Host:Port.
  448. %% @spec set_max_sessions(Host::string(), Port::integer(), Max::integer()) -> ok
  449. set_max_sessions(Host, Port, Max) when is_integer(Max), Max > 0 ->
  450. gen_server:call(?MODULE, {set_config_value, {max_sessions, Host, Port}, Max}).
  451. %% @doc Set the maximum pipeline size for each connection to a specific Host:Port.
  452. %% @spec set_max_pipeline_size(Host::string(), Port::integer(), Max::integer()) -> ok
  453. set_max_pipeline_size(Host, Port, Max) when is_integer(Max), Max > 0 ->
  454. gen_server:call(?MODULE, {set_config_value, {max_pipeline_size, Host, Port}, Max}).
  455. %% @doc Set the maximum attempts for each connection to a specific Host:Port.
  456. %% @spec set_max_attempts(Host::string(), Port::integer(), Max::integer()) -> ok
  457. set_max_attempts(Host, Port, Max) when is_integer(Max), Max > 0 ->
  458. gen_server:call(?MODULE, {set_config_value, {max_attempts, Host, Port}, Max}).
  459. %% @doc set the download directory where the files are stored
  460. %% @spec set_download_dir(Dir::string()) -> ok
  461. set_download_dir(Dir) ->
  462. gen_server:call(?MODULE, {set_config_value, download_dir, Dir}).
  463. do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
  464. case catch ibrowse_http_client:send_req(Conn_Pid, Parsed_url,
  465. Headers, Method, ensure_bin(Body),
  466. Options, Timeout) of
  467. {'EXIT', {timeout, _}} ->
  468. P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of
  469. [_|_] = Conn_Pid_info_list ->
  470. Conn_Pid_info_list;
  471. _ ->
  472. process_info_not_available
  473. end,
  474. log_msg("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n",
  475. [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info]),
  476. {error, req_timedout};
  477. {'EXIT', {normal, _}} = Ex_rsn ->
  478. log_msg("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n",
  479. [[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn]),
  480. {error, req_timedout};
  481. {error, X} when X == connection_closed;
  482. X == {send_failed, {error, enotconn}};
  483. X == {send_failed,{error,einval}};
  484. X == {send_failed,{error,closed}};
  485. X == connection_closing;
  486. ((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) ->
  487. {error, sel_conn_closed};
  488. {error, connection_closed_no_retry} ->
  489. {error, connection_closed};
  490. {error, {'EXIT', {noproc, _}}} ->
  491. {error, sel_conn_closed};
  492. {'EXIT', Reason} ->
  493. {error, {'EXIT', Reason}};
  494. {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
  495. case get_value(response_format, Options, list) of
  496. list ->
  497. {ok, St_code, Headers, binary_to_list(Body)};
  498. binary ->
  499. Ret
  500. end;
  501. {ok, St_code, Headers, Body, Req} = Ret when is_binary(Body) ->
  502. case get_value(response_format, Options, list) of
  503. list ->
  504. {ok, St_code, Headers, binary_to_list(Body), Req};
  505. binary ->
  506. Ret
  507. end;
  508. Ret ->
  509. Ret
  510. end.
  511. ensure_bin(L) when is_list(L) -> list_to_binary(L);
  512. ensure_bin(B) when is_binary(B) -> B;
  513. ensure_bin(Fun) when is_function(Fun) -> Fun;
  514. ensure_bin({Fun}) when is_function(Fun) -> Fun;
  515. ensure_bin({Fun, _} = Body) when is_function(Fun) -> Body.
  516. %% @doc Creates a HTTP client process to the specified Host:Port which
  517. %% is not part of the load balancing pool. This is useful in cases
  518. %% where some requests to a webserver might take a long time whereas
  519. %% some might take a very short time. To avoid getting these quick
  520. %% requests stuck in the pipeline behind time consuming requests, use
  521. %% this function to get a handle to a connection process. <br/>
  522. %% <b>Note:</b> Calling this function only creates a worker process. No connection
  523. %% is setup. The connection attempt is made only when the first
  524. %% request is sent via any of the send_req_direct/4,5,6,7 functions.<br/>
  525. %% <b>Note:</b> It is the responsibility of the calling process to control
  526. %% pipeline size on such connections.
  527. %% @spec spawn_worker_process(Url::string() | {Host::string(), Port::integer()}) -> {ok, pid()}
  528. spawn_worker_process(Args) ->
  529. spawn_worker_process(Args, []).
  530. %% @doc Same as spawn_worker_process/1 except with Erlang process options.
  531. %% @spec spawn_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
  532. spawn_worker_process(Host, Port) when is_list(Host), is_integer(Port) ->
  533. %% Convert old API calls to new API format.
  534. spawn_worker_process({Host, Port}, []);
  535. spawn_worker_process(Args, Options) ->
  536. ibrowse_http_client:start(Args, Options).
  537. %% @doc Same as spawn_worker_process/1 except the the calling process
  538. %% is linked to the worker process which is spawned.
  539. %% @spec spawn_link_worker_process(Url::string() | {Host::string(), Port::integer()}) -> {ok, pid()}
  540. spawn_link_worker_process(Args) ->
  541. spawn_link_worker_process(Args, []).
  542. %% @doc Same as spawn_link_worker_process/1 except with Erlang process options.
  543. %% @spec spawn_link_worker_process(Host::string(), Port::integer()) -> {ok, pid()}
  544. spawn_link_worker_process(Host, Port) when is_list(Host), is_integer(Port) ->
  545. %% Convert old API calls to new API format.
  546. spawn_link_worker_process({Host, Port}, []);
  547. spawn_link_worker_process(Args, Options) ->
  548. ibrowse_http_client:start_link(Args, Options).
  549. %% @doc Terminate a worker process spawned using
  550. %% spawn_worker_process/2 or spawn_link_worker_process/2. Requests in
  551. %% progress will get the error response <pre>{error, closing_on_request}</pre>
  552. %% @spec stop_worker_process(Conn_pid::pid()) -> ok
  553. stop_worker_process(Conn_pid) ->
  554. ibrowse_http_client:stop(Conn_pid).
  555. %% @doc Same as send_req/3 except that the first argument is the PID
  556. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  557. send_req_direct(Conn_pid, Url, Headers, Method) ->
  558. send_req_direct(Conn_pid, Url, Headers, Method, [], []).
  559. %% @doc Same as send_req/4 except that the first argument is the PID
  560. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  561. send_req_direct(Conn_pid, Url, Headers, Method, Body) ->
  562. send_req_direct(Conn_pid, Url, Headers, Method, Body, []).
  563. %% @doc Same as send_req/5 except that the first argument is the PID
  564. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  565. send_req_direct(Conn_pid, Url, Headers, Method, Body, Options) ->
  566. send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, 30000).
  567. %% @doc Same as send_req/6 except that the first argument is the PID
  568. %% returned by spawn_worker_process/2 or spawn_link_worker_process/2
  569. send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) ->
  570. case catch parse_url(Url) of
  571. #url{host = Host,
  572. port = Port} = Parsed_url ->
  573. Options_1 = merge_options(Host, Port, Options),
  574. case do_send_req(Conn_pid, Parsed_url, Headers, Method, Body, Options_1, Timeout) of
  575. {error, {'EXIT', {noproc, _}}} ->
  576. {error, worker_is_dead};
  577. Ret ->
  578. Ret
  579. end;
  580. Err ->
  581. {error, {url_parsing_failed, Err}}
  582. end.
  583. %% @doc Tell ibrowse to stream the next chunk of data to the
  584. %% caller. Should be used in conjunction with the
  585. %% <code>stream_to</code> option
  586. %% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
  587. stream_next(Req_id) ->
  588. case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
  589. [] ->
  590. {error, unknown_req_id};
  591. [{_, Pid}] ->
  592. catch Pid ! {stream_next, Req_id},
  593. ok
  594. end.
  595. %% @doc Tell ibrowse to close the connection associated with the
  596. %% specified stream. Should be used in conjunction with the
  597. %% <code>stream_to</code> option. Note that all requests in progress on
  598. %% the connection which is serving this Req_id will be aborted, and an
  599. %% error returned.
  600. %% @spec stream_close(Req_id :: req_id()) -> ok | {error, unknown_req_id}
  601. stream_close(Req_id) ->
  602. case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
  603. [] ->
  604. {error, unknown_req_id};
  605. [{_, Pid}] ->
  606. catch Pid ! {stream_close, Req_id},
  607. ok
  608. end.
  609. %% @doc Turn tracing on for the ibrowse process
  610. trace_on() ->
  611. ibrowse ! {trace, true}.
  612. %% @doc Turn tracing off for the ibrowse process
  613. trace_off() ->
  614. ibrowse ! {trace, false}.
  615. %% @doc Turn tracing on for all connections to the specified HTTP
  616. %% server. Host is whatever is specified as the domain name in the URL
  617. %% @spec trace_on(Host, Port) -> ok
  618. %% Host = string()
  619. %% Port = integer()
  620. trace_on(Host, Port) ->
  621. ibrowse ! {trace, true, Host, Port},
  622. ok.
  623. %% @doc Turn tracing OFF for all connections to the specified HTTP
  624. %% server.
  625. %% @spec trace_off(Host, Port) -> ok
  626. trace_off(Host, Port) ->
  627. ibrowse ! {trace, false, Host, Port},
  628. ok.
  629. %% @doc Turn Off ALL tracing
  630. %% @spec all_trace_off() -> ok
  631. all_trace_off() ->
  632. ibrowse ! all_trace_off,
  633. ok.
  634. %% @doc Shows some internal information about load balancing. Info
  635. %% about workers spawned using spawn_worker_process/2 or
  636. %% spawn_link_worker_process/2 is not included.
  637. -ifdef(ets_ref).
  638. show_dest_status() ->
  639. io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
  640. ["Server:port", "ETS", "Num conns", "LB Pid"]),
  641. io:format("~80.80.=s~n", [""]),
  642. Metrics = get_metrics(),
  643. lists:foreach(
  644. fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
  645. io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
  646. [Host ++ ":" ++ integer_to_list(Port),
  647. ref_to_list(Tid),
  648. integer_to_list(Size),
  649. Lb_pid])
  650. end, Metrics).
  651. -else.
  652. show_dest_status() ->
  653. io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
  654. ["Server:port", "ETS", "Num conns", "LB Pid"]),
  655. io:format("~80.80.=s~n", [""]),
  656. Metrics = get_metrics(),
  657. lists:foreach(
  658. fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
  659. io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
  660. [Host ++ ":" ++ integer_to_list(Port),
  661. integer_to_list(Tid),
  662. integer_to_list(Size),
  663. Lb_pid])
  664. end, Metrics).
  665. -endif.
  666. show_dest_status(Url) ->
  667. #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
  668. show_dest_status(Host, Port).
  669. %% @doc Shows some internal information about load balancing to a
  670. %% specified Host:Port. Info about workers spawned using
  671. %% spawn_worker_process/2 or spawn_link_worker_process/2 is not
  672. %% included.
  673. show_dest_status(Host, Port) ->
  674. case get_metrics(Host, Port) of
  675. {Lb_pid, MsgQueueSize,
  676. Tid, Size,
  677. {{First_p_sz, First_p_sz},
  678. {Last_p_sz, Last_p_sz}}} ->
  679. io:format("Load Balancer Pid : ~p~n"
  680. "LB process msg q size : ~p~n"
  681. "LB ETS table id : ~p~n"
  682. "Num Connections : ~p~n"
  683. "Smallest pipeline : ~p~n"
  684. "Largest pipeline : ~p~n",
  685. [Lb_pid, MsgQueueSize, Tid, Size,
  686. First_p_sz, Last_p_sz]);
  687. _Err ->
  688. io:format("Metrics not available~n", [])
  689. end.
  690. get_metrics() ->
  691. Dests = lists:filter(
  692. fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
  693. is_integer(Port) ->
  694. true;
  695. (_) ->
  696. false
  697. end, ets:tab2list(ibrowse_lb)),
  698. lists:foldl(
  699. fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
  700. case get_metrics(X_host, X_port) of
  701. {_, _, _, _, _} = X_res ->
  702. [{X_host, X_port, X_res} | X_acc];
  703. _X_res ->
  704. X_acc
  705. end
  706. end, [], Dests).
  707. get_metrics(Host, Port) ->
  708. case ets:lookup(ibrowse_lb, {Host, Port}) of
  709. [] ->
  710. no_active_processes;
  711. [#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
  712. MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
  713. {message_queue_len, Msg_q_len} ->
  714. Msg_q_len;
  715. _ ->
  716. -1
  717. end,
  718. case Tid of
  719. undefined ->
  720. {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
  721. _ ->
  722. try
  723. Size = ets:info(Tid, size),
  724. case Size of
  725. 0 ->
  726. {Lb_pid, MsgQueueSize, Tid, 0, {{0, 0}, {0, 0}}};
  727. _ ->
  728. {First_p_sz, _, _} = ets:first(Tid),
  729. {Last_p_sz, _, _} = ets:last(Tid),
  730. {Lb_pid, MsgQueueSize,
  731. Tid, Size,
  732. {{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}}
  733. end
  734. catch _:_Err ->
  735. not_available
  736. end
  737. end
  738. end.
  739. %% @doc Clear current configuration for ibrowse and load from the file
  740. %% ibrowse.conf in the IBROWSE_EBIN/../priv directory. Current
  741. %% configuration is cleared only if the ibrowse.conf file is readable
  742. %% using file:consult/1
  743. rescan_config() ->
  744. gen_server:call(?MODULE, rescan_config).
  745. %% Clear current configuration for ibrowse and load from the specified
  746. %% file. Current configuration is cleared only if the specified
  747. %% file is readable using file:consult/1
  748. rescan_config([{_,_}|_]=Terms) ->
  749. gen_server:call(?MODULE, {rescan_config_terms, Terms});
  750. rescan_config(File) when is_list(File) ->
  751. gen_server:call(?MODULE, {rescan_config, File}).
  752. %% @doc Add additional configuration elements at runtime.
  753. add_config([{_,_}|_]=Terms) ->
  754. gen_server:call(?MODULE, {add_config_terms, Terms}).
  755. %%====================================================================
  756. %% Server functions
  757. %%====================================================================
  758. %%--------------------------------------------------------------------
  759. %% Function: init/1
  760. %% Description: Initiates the server
  761. %% Returns: {ok, State} |
  762. %% {ok, State, Timeout} |
  763. %% ignore |
  764. %% {stop, Reason}
  765. %%--------------------------------------------------------------------
  766. init(_) ->
  767. process_flag(trap_exit, true),
  768. State = #state{},
  769. put(my_trace_flag, State#state.trace),
  770. put(ibrowse_trace_token, "ibrowse"),
  771. ibrowse_lb = ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
  772. ibrowse_conf = ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
  773. ibrowse_stream = ets:new(ibrowse_stream, [named_table, public]),
  774. import_config(),
  775. {ok, #state{}}.
  776. import_config() ->
  777. case code:priv_dir(ibrowse) of
  778. {error, _} ->
  779. ok;
  780. PrivDir ->
  781. Filename = filename:join(PrivDir, "ibrowse.conf"),
  782. import_config(Filename)
  783. end.
  784. import_config(Filename) ->
  785. case file:consult(Filename) of
  786. {ok, Terms} ->
  787. apply_config(Terms);
  788. _Err ->
  789. ok
  790. end.
  791. apply_config(Terms) ->
  792. ets:delete_all_objects(ibrowse_conf),
  793. insert_config(Terms).
  794. insert_config(Terms) ->
  795. Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
  796. when is_list(Host), is_integer(Port),
  797. is_integer(MaxSess), MaxSess > 0,
  798. is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
  799. I = [{{max_sessions, Host, Port}, MaxSess},
  800. {{max_pipeline_size, Host, Port}, MaxPipe},
  801. {{options, Host, Port}, Options}],
  802. lists:foreach(
  803. fun({X, Y}) ->
  804. ets:insert(ibrowse_conf,
  805. #ibrowse_conf{key = X,
  806. value = Y})
  807. end, I);
  808. ({K, V}) ->
  809. ets:insert(ibrowse_conf,
  810. #ibrowse_conf{key = K,
  811. value = V});
  812. (X) ->
  813. io:format("Skipping unrecognised term: ~p~n", [X])
  814. end,
  815. lists:foreach(Fun, Terms).
  816. %% @doc Internal export
  817. get_config_value(Key) ->
  818. try
  819. [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
  820. V
  821. catch
  822. error:badarg ->
  823. throw({error, ibrowse_not_running})
  824. end.
  825. %% @doc Internal export
  826. get_config_value(Key, DefVal) ->
  827. try
  828. case ets:lookup(ibrowse_conf, Key) of
  829. [] ->
  830. DefVal;
  831. [#ibrowse_conf{value = V}] ->
  832. V
  833. end
  834. catch
  835. error:badarg ->
  836. throw({error, ibrowse_not_running})
  837. end.
  838. set_config_value(Key, Val) ->
  839. ets:insert(ibrowse_conf, #ibrowse_conf{key = Key, value = Val}).
  840. %%--------------------------------------------------------------------
  841. %% Function: handle_call/3
  842. %% Description: Handling call messages
  843. %% Returns: {reply, Reply, State} |
  844. %% {reply, Reply, State, Timeout} |
  845. %% {noreply, State} |
  846. %% {noreply, State, Timeout} |
  847. %% {stop, Reason, Reply, State} | (terminate/2 is called)
  848. %% {stop, Reason, State} (terminate/2 is called)
  849. %%--------------------------------------------------------------------
  850. handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
  851. Pid = do_get_connection(Url, ets:lookup(ibrowse_lb, {Host, Port})),
  852. {reply, Pid, State};
  853. handle_call(stop, _From, State) ->
  854. do_trace("IBROWSE shutting down~n", []),
  855. ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
  856. ibrowse_lb:stop(Pid),
  857. Acc
  858. end, [], ibrowse_lb),
  859. {stop, normal, ok, State};
  860. handle_call({set_config_value, Key, Val}, _From, State) ->
  861. set_config_value(Key, Val),
  862. {reply, ok, State};
  863. handle_call(rescan_config, _From, State) ->
  864. Ret = (catch import_config()),
  865. {reply, Ret, State};
  866. handle_call({rescan_config, File}, _From, State) ->
  867. Ret = (catch import_config(File)),
  868. {reply, Ret, State};
  869. handle_call({rescan_config_terms, Terms}, _From, State) ->
  870. Ret = (catch apply_config(Terms)),
  871. {reply, Ret, State};
  872. handle_call({add_config_terms, Terms}, _From, State) ->
  873. Ret = (catch insert_config(Terms)),
  874. {reply, Ret, State};
  875. handle_call(Request, _From, State) ->
  876. Reply = {unknown_request, Request},
  877. {reply, Reply, State}.
  878. %%--------------------------------------------------------------------
  879. %% Function: handle_cast/2
  880. %% Description: Handling cast messages
  881. %% Returns: {noreply, State} |
  882. %% {noreply, State, Timeout} |
  883. %% {stop, Reason, State} (terminate/2 is called)
  884. %%--------------------------------------------------------------------
  885. handle_cast(_Msg, State) ->
  886. {noreply, State}.
  887. %%--------------------------------------------------------------------
  888. %% Function: handle_info/2
  889. %% Description: Handling all non call/cast messages
  890. %% Returns: {noreply, State} |
  891. %% {noreply, State, Timeout} |
  892. %% {stop, Reason, State} (terminate/2 is called)
  893. %%--------------------------------------------------------------------
  894. handle_info(all_trace_off, State) ->
  895. Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}],
  896. Trace_on_dests = ets:select(ibrowse_conf, Mspec),
  897. Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) ->
  898. case lists:member({H, P}, Trace_on_dests) of
  899. false ->
  900. ok;
  901. true ->
  902. catch Pid ! {trace, false}
  903. end;
  904. (_, Acc) ->
  905. Acc
  906. end,
  907. ets:foldl(Fun, undefined, ibrowse_lb),
  908. ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]),
  909. {noreply, State};
  910. handle_info({trace, Bool}, State) ->
  911. put(my_trace_flag, Bool),
  912. {noreply, State};
  913. handle_info({trace, Bool, Host, Port}, State) ->
  914. Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _)
  915. when H == Host,
  916. P == Port ->
  917. catch Pid ! {trace, Bool};
  918. (_, Acc) ->
  919. Acc
  920. end,
  921. ets:foldl(Fun, undefined, ibrowse_lb),
  922. ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port},
  923. value = Bool}),
  924. {noreply, State};
  925. handle_info(_Info, State) ->
  926. {noreply, State}.
  927. %%--------------------------------------------------------------------
  928. %% Function: terminate/2
  929. %% Description: Shutdown the server
  930. %% Returns: any (ignored by gen_server)
  931. %%--------------------------------------------------------------------
  932. terminate(_Reason, _State) ->
  933. ok.
  934. %%--------------------------------------------------------------------
  935. %% Func: code_change/3
  936. %% Purpose: Convert process state when code is changed
  937. %% Returns: {ok, NewState}
  938. %%--------------------------------------------------------------------
  939. code_change(_OldVsn, State, _Extra) ->
  940. {ok, State}.
  941. %%--------------------------------------------------------------------
  942. %%% Internal functions
  943. %%--------------------------------------------------------------------
  944. do_get_connection(#url{host = Host, port = Port}, []) ->
  945. {ok, Pid} = ibrowse_lb:start_link([Host, Port]),
  946. Pid;
  947. do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
  948. Pid.