You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2199 line
91 KiB

14 年之前
13 年之前
13 年之前
13 年之前
14 年之前
14 年之前
13 年之前
14 年之前
14 年之前
14 年之前
13 年之前
9 年之前
9 年之前
14 年之前
14 年之前
14 年之前
11 年之前
11 年之前
11 年之前
11 年之前
11 年之前
11 年之前
14 年之前
11 年之前
11 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
13 年之前
13 年之前
11 年之前
14 年之前
14 年之前
14 年之前
Use Erlang's OTP base64 module (available since R12B02) and avoid duplicated base64 encoding/decoding code in ibrowse_lib.erl and ibrowse_http_client.erl. OTP's base64 module is also more efficient (C implementation): 1> Data = crypto:rand_bytes(4096). <<205,174,13,169,97,159,110,161,71,43,226,153,42,101,243, 83,11,96,23,161,253,251,129,240,163,216,58,175,190,...>> 2> 2> timer:tc(ibrowse_lib, encode_base64, [Data]). {2920, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 3> timer:tc(ibrowse_lib, encode_base64, [Data]). {1221, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 4> timer:tc(ibrowse_lib, encode_base64, [Data]). {1436, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 5> timer:tc(ibrowse_lib, encode_base64, [Data]). {1195, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 6> 6> timer:tc(base64, encode, [Data]). {1846, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 7> timer:tc(base64, encode, [Data]). {743, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 8> timer:tc(base64, encode, [Data]). {737, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 9> timer:tc(base64, encode, [Data]). {656, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>}
15 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
13 年之前
14 年之前
14 年之前
9 年之前
13 年之前
13 年之前
9 年之前
13 年之前
14 年之前
13 年之前
13 年之前
13 年之前
13 年之前
13 年之前
14 年之前
14 年之前
14 年之前
14 年之前
13 年之前
8 年之前
8 年之前
14 年之前
14 年之前
13 年之前
14 年之前
14 年之前
14 年之前
11 年之前
  1. %%%-------------------------------------------------------------------
  2. %%% File : ibrowse_http_client.erl
  3. %%% Author : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  4. %%% Description : The name says it all
  5. %%%
  6. %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  7. %%%-------------------------------------------------------------------
  8. -module(ibrowse_http_client).
  9. -behaviour(gen_server).
  10. %%--------------------------------------------------------------------
  11. %% Include files
  12. %%--------------------------------------------------------------------
  13. %%--------------------------------------------------------------------
  14. %% External exports
  15. -export([
  16. start_link/1,
  17. start_link/2,
  18. start/1,
  19. start/2,
  20. stop/1,
  21. send_req/7
  22. ]).
  23. -ifdef(debug).
  24. -compile(export_all).
  25. -endif.
  26. %% gen_server callbacks
  27. -export([
  28. init/1,
  29. handle_call/3,
  30. handle_cast/2,
  31. handle_info/2,
  32. terminate/2,
  33. code_change/3
  34. ]).
  35. -include("ibrowse.hrl").
  36. -include_lib("kernel/include/inet.hrl").
  37. -record(state, {host, port, connect_timeout,
  38. inactivity_timer_ref,
  39. use_proxy = false, proxy_auth_basic,
  40. ssl_options = [], is_ssl = false, socket,
  41. proxy_tunnel_setup = false,
  42. tunnel_setup_queue = [],
  43. reqs=queue:new(), cur_req, status=idle, http_status_code,
  44. reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0,
  45. recvd_headers=[],
  46. status_line, raw_headers,
  47. is_closing, content_length,
  48. deleted_crlf = false, transfer_encoding,
  49. chunk_size, chunk_size_buffer = <<>>,
  50. recvd_chunk_size, interim_reply_sent = false,
  51. lb_ets_tid, cur_pipeline_size = 0, prev_req_id,
  52. proc_state
  53. }).
  54. -record(request, {url, method, options, from,
  55. stream_to, caller_controls_socket = false,
  56. caller_socket_options = [],
  57. req_id,
  58. stream_full_chunks = false,
  59. stream_chunk_size,
  60. save_response_to_file = false,
  61. tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
  62. response_format, timer_ref, raw_req}).
  63. -import(ibrowse_lib, [
  64. get_value/2,
  65. get_value/3,
  66. do_trace/2
  67. ]).
  68. -define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
  69. -define(dec2hex(X), erlang:integer_to_list(X, 16)).
  70. %% Macros to prevent spelling mistakes causing bugs
  71. -define(dont_retry_pipelined_requests, dont_retry_pipelined_requests).
  72. -define(can_retry_pipelined_requests, can_retry_pipelined_requests).
  73. -define(dead_proc_walking, dead_proc_walking).
  74. %%====================================================================
  75. %% External functions
  76. %%====================================================================
  77. %%--------------------------------------------------------------------
  78. %% Function: start_link/0
  79. %% Description: Starts the server
  80. %%--------------------------------------------------------------------
  81. start(Args) ->
  82. start(Args, []).
  83. start(Args, Options) ->
  84. gen_server:start(?MODULE, Args, Options).
  85. start_link(Args) ->
  86. start_link(Args, []).
  87. start_link(Args, Options) ->
  88. gen_server:start_link(?MODULE, Args, Options).
  89. stop(Conn_pid) ->
  90. case catch gen_server:call(Conn_pid, stop) of
  91. {'EXIT', {timeout, _}} ->
  92. exit(Conn_pid, kill),
  93. ok;
  94. _ ->
  95. ok
  96. end.
  97. send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
  98. case catch gen_server:call(Conn_Pid,
  99. {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout) of
  100. {'EXIT', {timeout, _}} ->
  101. {error, req_timedout};
  102. {'EXIT', {noproc, _}} ->
  103. {error, connection_closed};
  104. Res ->
  105. Res
  106. end.
  107. %%====================================================================
  108. %% Server functions
  109. %%====================================================================
  110. %%--------------------------------------------------------------------
  111. %% Function: init/1
  112. %% Description: Initiates the server
  113. %% Returns: {ok, State} |
  114. %% {ok, State, Timeout} |
  115. %% ignore |
  116. %% {stop, Reason}
  117. %%--------------------------------------------------------------------
  118. init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
  119. maybe_trap_exits(),
  120. State = #state{host = Host,
  121. port = Port,
  122. ssl_options = SSLOptions,
  123. is_ssl = Is_ssl,
  124. lb_ets_tid = Lb_Tid},
  125. put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
  126. put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
  127. {ok, set_inac_timer(State)};
  128. init(Url) when is_list(Url) ->
  129. maybe_trap_exits(),
  130. case catch ibrowse_lib:parse_url(Url) of
  131. #url{protocol = Protocol} = Url_rec ->
  132. init({undefined, Url_rec, {[], Protocol == https}});
  133. {'EXIT', _} ->
  134. {error, invalid_url}
  135. end;
  136. init({Host, Port}) ->
  137. maybe_trap_exits(),
  138. State = #state{host = Host,
  139. port = Port},
  140. put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
  141. put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
  142. {ok, set_inac_timer(State)}.
  143. %%--------------------------------------------------------------------
  144. %% Function: handle_call/3
  145. %% Description: Handling call messages
  146. %% Returns: {reply, Reply, State} |
  147. %% {reply, Reply, State, Timeout} |
  148. %% {noreply, State} |
  149. %% {noreply, State, Timeout} |
  150. %% {stop, Reason, Reply, State} | (terminate/2 is called)
  151. %% {stop, Reason, State} (terminate/2 is called)
  152. %%--------------------------------------------------------------------
  153. %% Received a request when the remote server has already sent us a
  154. %% Connection: Close header
  155. handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
  156. {reply, {error, connection_closing}, State};
  157. handle_call({send_req, _}, _From, #state{proc_state = ?dead_proc_walking} = State) ->
  158. shutting_down(State),
  159. {reply, {error, connection_closing}, State};
  160. handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
  161. From, State) ->
  162. send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
  163. handle_call(stop, _From, State) ->
  164. do_close(State),
  165. do_error_reply(State, closing_on_request),
  166. {stop, normal, ok, State};
  167. handle_call(Request, _From, State) ->
  168. Reply = {unknown_request, Request},
  169. {reply, Reply, State}.
  170. %%--------------------------------------------------------------------
  171. %% Function: handle_cast/2
  172. %% Description: Handling cast messages
  173. %% Returns: {noreply, State} |
  174. %% {noreply, State, Timeout} |
  175. %% {stop, Reason, State} (terminate/2 is called)
  176. %%--------------------------------------------------------------------
  177. handle_cast(_Msg, State) ->
  178. {noreply, State}.
  179. %%--------------------------------------------------------------------
  180. %% Function: handle_info/2
  181. %% Description: Handling all non call/cast messages
  182. %% Returns: {noreply, State} |
  183. %% {noreply, State, Timeout} |
  184. %% {stop, Reason, State} (terminate/2 is called)
  185. %%--------------------------------------------------------------------
  186. handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
  187. do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
  188. handle_sock_data(Data, State);
  189. handle_info({ssl, _Sock, Data}, State) ->
  190. handle_sock_data(Data, State);
  191. handle_info({stream_next, Req_id}, #state{socket = Socket,
  192. cur_req = #request{req_id = Req_id}} = State) ->
  193. _ = do_setopts(Socket, [{active, once}], State),
  194. {noreply, set_inac_timer(State)};
  195. handle_info({stream_next, _Req_id}, State) ->
  196. _Cur_req_id = case State#state.cur_req of
  197. #request{req_id = Cur} ->
  198. Cur;
  199. _ ->
  200. undefined
  201. end,
  202. {noreply, State};
  203. handle_info({stream_close, _Req_id}, State) ->
  204. State_1 = State#state{proc_state = ?dead_proc_walking},
  205. shutting_down(State_1),
  206. do_close(State_1),
  207. do_error_reply(State_1, closing_on_request),
  208. delayed_stop_timer(),
  209. {noreply, State_1};
  210. handle_info({tcp_closed, _Sock}, State) ->
  211. do_trace("TCP connection closed by peer!~n", []),
  212. State_1 = State#state{proc_state = ?dead_proc_walking},
  213. handle_sock_closed(State_1, ?can_retry_pipelined_requests),
  214. delayed_stop_timer(),
  215. {noreply, State_1};
  216. handle_info({ssl_closed, _Sock}, State) ->
  217. do_trace("SSL connection closed by peer!~n", []),
  218. State_1 = State#state{proc_state = ?dead_proc_walking},
  219. handle_sock_closed(State_1, ?can_retry_pipelined_requests),
  220. delayed_stop_timer(),
  221. {noreply, State_1};
  222. handle_info({tcp_error, _Sock, Reason}, State) ->
  223. do_trace("Error on connection to ~1000.p:~1000.p -> ~1000.p~n",
  224. [State#state.host, State#state.port, Reason]),
  225. State_1 = State#state{proc_state = ?dead_proc_walking},
  226. handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
  227. delayed_stop_timer(),
  228. {noreply, State_1};
  229. handle_info({ssl_error, _Sock, Reason}, State) ->
  230. do_trace("Error on SSL connection to ~1000.p:~1000.p -> ~1000.p~n",
  231. [State#state.host, State#state.port, Reason]),
  232. State_1 = State#state{proc_state = ?dead_proc_walking},
  233. handle_sock_closed(State_1, ?dont_retry_pipelined_requests),
  234. delayed_stop_timer(),
  235. {noreply, State_1};
  236. handle_info({req_timedout, From}, #state{reqs = Reqs} = State) ->
  237. Reqs_list = queue:to_list(Reqs),
  238. case lists:keysearch(From, #request.from, Reqs_list) of
  239. false ->
  240. {noreply, State};
  241. {value, #request{stream_to = StreamTo, req_id = ReqId}} ->
  242. catch StreamTo ! {ibrowse_async_response_timeout, ReqId},
  243. State_1 = State#state{proc_state = ?dead_proc_walking},
  244. shutting_down(State_1),
  245. Reqs_1 = lists:filter(fun(#request{from = X_from}) ->
  246. X_from /= From
  247. end, Reqs_list),
  248. State_2 = State_1#state{reqs = queue:from_list(Reqs_1)},
  249. do_error_reply(State_2, req_timedout),
  250. delayed_stop_timer(),
  251. {noreply, State_2}
  252. end;
  253. handle_info(timeout, State) ->
  254. do_trace("Inactivity timeout triggered. Shutting down connection~n", []),
  255. State_1 = State#state{proc_state = ?dead_proc_walking},
  256. shutting_down(State_1),
  257. do_error_reply(State_1, req_timedout),
  258. delayed_stop_timer(),
  259. {noreply, State_1};
  260. handle_info({trace, Bool}, State) ->
  261. put(my_trace_flag, Bool),
  262. {noreply, State};
  263. handle_info(delayed_stop, State) ->
  264. {stop, normal, State};
  265. handle_info(Info, State) ->
  266. io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
  267. [State#state.host, State#state.port, Info]),
  268. io:format("Recvd unknown message ~p when in state: ~p~n", [Info, State]),
  269. {noreply, State}.
  270. %%--------------------------------------------------------------------
  271. %% Function: terminate/2
  272. %% Description: Shutdown the server
  273. %% Returns: any (ignored by gen_server)
  274. %%--------------------------------------------------------------------
  275. terminate(_Reason, #state{lb_ets_tid = Tid} = State) ->
  276. do_close(State),
  277. shutting_down(State),
  278. (catch ets:select_delete(Tid, [{{{'_','_','$1'},'_'},[{'==','$1',{const,self()}}],[true]}])),
  279. ok.
  280. %%--------------------------------------------------------------------
  281. %% Func: code_change/3
  282. %% Purpose: Convert process state when code is changed
  283. %% Returns: {ok, NewState}
  284. %%--------------------------------------------------------------------
  285. code_change(_OldVsn, State, _Extra) ->
  286. {ok, State}.
  287. %%--------------------------------------------------------------------
  288. %%% Internal functions
  289. %%--------------------------------------------------------------------
  290. %%--------------------------------------------------------------------
  291. %% Handles data recvd on the socket
  292. %%--------------------------------------------------------------------
  293. handle_sock_data(Data, #state{status=idle}=State) ->
  294. do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
  295. State_1 = State#state{proc_state = ?dead_proc_walking},
  296. shutting_down(State_1),
  297. do_error_reply(State_1, data_in_status_idle),
  298. do_close(State_1),
  299. delayed_stop_timer(),
  300. {noreply, State_1};
  301. handle_sock_data(Data, #state{status = get_header}=State) ->
  302. case parse_response(Data, State) of
  303. {error, _Reason} ->
  304. State_1 = State#state{proc_state = ?dead_proc_walking},
  305. shutting_down(State_1),
  306. delayed_stop_timer(),
  307. {noreply, State_1};
  308. #state{socket = Socket, status = Status, cur_req = CurReq} = State_1 ->
  309. _ = case {Status, CurReq} of
  310. {get_header, #request{caller_controls_socket = true}} ->
  311. do_setopts(Socket, [{active, once}], State_1);
  312. _ ->
  313. active_once(State_1)
  314. end,
  315. {noreply, set_inac_timer(State_1)}
  316. end;
  317. handle_sock_data(Data, #state{status = get_body,
  318. socket = Socket,
  319. content_length = CL,
  320. http_status_code = StatCode,
  321. recvd_headers = Headers,
  322. chunk_size = CSz} = State) ->
  323. case (CL == undefined) and (CSz == undefined) of
  324. true ->
  325. case accumulate_response(Data, State) of
  326. {error, Reason} ->
  327. State_1 = State#state{proc_state = ?dead_proc_walking},
  328. shutting_down(State_1),
  329. fail_pipelined_requests(State_1,
  330. {error, {Reason, {stat_code, StatCode}, Headers}}),
  331. delayed_stop_timer(),
  332. {noreply, State_1};
  333. State_1 ->
  334. _ = active_once(State_1),
  335. State_2 = set_inac_timer(State_1),
  336. {noreply, State_2}
  337. end;
  338. _ ->
  339. case parse_11_response(Data, State) of
  340. {error, Reason} ->
  341. State_1 = State#state{proc_state = ?dead_proc_walking},
  342. shutting_down(State_1),
  343. fail_pipelined_requests(State_1,
  344. {error, {Reason, {stat_code, StatCode}, Headers}}),
  345. delayed_stop_timer(),
  346. {noreply, State_1};
  347. #state{cur_req = #request{caller_controls_socket = Ccs},
  348. interim_reply_sent = Irs} = State_1 ->
  349. _ = case Irs of
  350. true ->
  351. active_once(State_1);
  352. false when Ccs == true ->
  353. do_setopts(Socket, [{active, once}], State);
  354. false ->
  355. active_once(State_1)
  356. end,
  357. State_2 = State_1#state{interim_reply_sent = false},
  358. case Ccs of
  359. true ->
  360. cancel_timer(State_2#state.inactivity_timer_ref, {eat_message, timeout}),
  361. {noreply, State_2#state{inactivity_timer_ref = undefined}};
  362. _ ->
  363. {noreply, set_inac_timer(State_2)}
  364. end;
  365. State_1 ->
  366. _ = active_once(State_1),
  367. State_2 = set_inac_timer(State_1),
  368. {noreply, State_2}
  369. end
  370. end.
  371. accumulate_response(Data,
  372. #state{
  373. cur_req = #request{save_response_to_file = Srtf,
  374. tmp_file_fd = undefined} = CurReq,
  375. http_status_code=[$2 | _]}=State) when Srtf /= false ->
  376. TmpFilename = make_tmp_filename(Srtf),
  377. Mode = file_mode(Srtf),
  378. case file:open(TmpFilename, [Mode, delayed_write, raw]) of
  379. {ok, Fd} ->
  380. accumulate_response(Data, State#state{
  381. cur_req = CurReq#request{
  382. tmp_file_fd = Fd,
  383. tmp_file_name = TmpFilename}});
  384. {error, Reason} ->
  385. {error, {file_open_error, Reason}}
  386. end;
  387. accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf,
  388. tmp_file_fd = Fd},
  389. transfer_encoding=chunked,
  390. reply_buffer = Reply_buf,
  391. http_status_code=[$2 | _]
  392. } = State) when Srtf /= false ->
  393. case file:write(Fd, [Reply_buf, Data]) of
  394. ok ->
  395. State#state{reply_buffer = <<>>};
  396. {error, Reason} ->
  397. {error, {file_write_error, Reason}}
  398. end;
  399. accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf,
  400. tmp_file_fd = Fd},
  401. reply_buffer = RepBuf,
  402. http_status_code=[$2 | _]
  403. } = State) when Srtf /= false ->
  404. case file:write(Fd, [RepBuf, Data]) of
  405. ok ->
  406. State#state{reply_buffer = <<>>};
  407. {error, Reason} ->
  408. {error, {file_write_error, Reason}}
  409. end;
  410. accumulate_response(Data, #state{reply_buffer = RepBuf,
  411. rep_buf_size = RepBufSize,
  412. streamed_size = Streamed_size,
  413. cur_req = CurReq}=State) ->
  414. #request{stream_to = StreamTo,
  415. req_id = ReqId,
  416. stream_chunk_size = Stream_chunk_size,
  417. response_format = Response_format,
  418. caller_controls_socket = Caller_controls_socket} = CurReq,
  419. RepBuf_1 = <<RepBuf/binary, Data/binary>>,
  420. New_data_size = RepBufSize - Streamed_size,
  421. case StreamTo of
  422. undefined ->
  423. State#state{reply_buffer = RepBuf_1};
  424. _ when Caller_controls_socket == true ->
  425. do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
  426. State#state{reply_buffer = <<>>,
  427. interim_reply_sent = true,
  428. streamed_size = Streamed_size + size(RepBuf_1)};
  429. _ when New_data_size >= Stream_chunk_size ->
  430. {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
  431. do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
  432. State_1 = State#state{
  433. reply_buffer = <<>>,
  434. interim_reply_sent = true,
  435. streamed_size = Streamed_size + Stream_chunk_size},
  436. case Rem_data of
  437. <<>> ->
  438. State_1;
  439. _ ->
  440. accumulate_response(Rem_data, State_1)
  441. end;
  442. _ ->
  443. State#state{reply_buffer = RepBuf_1}
  444. end.
  445. generate_timestamp() ->
  446. case catch erlang:unique_integer([positive]) of
  447. {'EXIT', _} ->
  448. erlang:apply(erlang, now, []);
  449. Unique ->
  450. {A,B,C} = os:timestamp(),
  451. {A * 1000000 + B, C, Unique}
  452. end.
  453. make_tmp_filename(true) ->
  454. DownloadDir = ibrowse:get_config_value(download_dir, filename:absname("./")),
  455. {A,B,C} = os:timestamp(),
  456. filename:join([DownloadDir,
  457. "ibrowse_tmp_file_"++
  458. integer_to_list(A) ++
  459. integer_to_list(B) ++
  460. integer_to_list(C)]);
  461. make_tmp_filename(File) when is_list(File) ->
  462. File;
  463. make_tmp_filename({append, File}) when is_list(File) ->
  464. File.
  465. file_mode({append, _File}) -> append;
  466. file_mode(_Srtf) -> write.
  467. %%--------------------------------------------------------------------
  468. %% Handles the case when the server closes the socket
  469. %%--------------------------------------------------------------------
  470. handle_sock_closed(#state{status=get_header} = State, _) ->
  471. shutting_down(State),
  472. do_error_reply(State, connection_closed_no_retry);
  473. handle_sock_closed(#state{cur_req=undefined} = State, _) ->
  474. shutting_down(State);
  475. %% We check for IsClosing because this the server could have sent a
  476. %% Connection-Close header and has closed the socket to indicate end
  477. %% of response. There maybe requests pipelined which need a response.
  478. handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC,
  479. is_closing = IsClosing,
  480. cur_req = #request{tmp_file_name=TmpFilename,
  481. tmp_file_fd=Fd} = CurReq,
  482. status = get_body,
  483. recvd_headers = Headers,
  484. status_line = Status_line,
  485. raw_headers = Raw_headers
  486. }=State, Retry_state) ->
  487. #request{from=From, stream_to=StreamTo, req_id=ReqId,
  488. response_format = Resp_format,
  489. options = Options,
  490. raw_req = Raw_req
  491. } = CurReq,
  492. case IsClosing of
  493. true ->
  494. {_, Reqs_1} = queue:out(Reqs),
  495. Body = case TmpFilename of
  496. undefined ->
  497. Buf;
  498. _ ->
  499. ok = file:close(Fd),
  500. {file, TmpFilename}
  501. end,
  502. Give_raw_req = get_value(return_raw_request, Options, false),
  503. Reply = case get_value(give_raw_headers, Options, false) of
  504. true when Give_raw_req == false->
  505. {ok, Status_line, Raw_headers, Body};
  506. true ->
  507. {ok, Status_line, Raw_headers, Body, Raw_req};
  508. false when Give_raw_req == false ->
  509. {ok, SC, Headers, Body};
  510. false ->
  511. {ok, SC, Headers, Body, Raw_req}
  512. end,
  513. State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  514. case Retry_state of
  515. ?dont_retry_pipelined_requests ->
  516. ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed_no_retry);
  517. ?can_retry_pipelined_requests ->
  518. ok = do_error_reply(State_1#state{reqs = Reqs_1}, connection_closed)
  519. end,
  520. State_1;
  521. _ ->
  522. case Retry_state of
  523. ?dont_retry_pipelined_requests ->
  524. ok = do_error_reply(State, connection_closed_no_retry);
  525. ?can_retry_pipelined_requests ->
  526. ok = do_error_reply(State, connection_closed)
  527. end,
  528. State
  529. end.
  530. do_connect(Host, Port, Options, #state{is_ssl = true,
  531. use_proxy = false,
  532. ssl_options = SSLOptions},
  533. Timeout) ->
  534. %% if a socks5 proxy is configured, open the socket separately
  535. %% before upgrading the socket to a TLS connection.
  536. case get_value(socks5_host, Options, undefined) of
  537. %% no socks5 proxy is configured, connect directly with TLS:
  538. undefined ->
  539. Sock_options = get_sock_options(Host, Options, SSLOptions),
  540. ssl:connect(Host, Port, Sock_options, Timeout);
  541. %% proxy configuration is present: first establish a socket
  542. %% and then upgrade:
  543. _ ->
  544. Sock_options = get_sock_options(Host, Options, []),
  545. Conn = ibrowse_socks5:connect(Host, Port, Options,
  546. Sock_options, Timeout),
  547. case Conn of
  548. {ok, Sock} ->
  549. ssl:connect(Sock, SSLOptions, Timeout);
  550. _ ->
  551. error
  552. end
  553. end;
  554. do_connect(Host, Port, Options, _State, Timeout) ->
  555. Socks5Host = get_value(socks5_host, Options, undefined),
  556. Sock_options = get_sock_options(Host, Options, []),
  557. case Socks5Host of
  558. undefined ->
  559. gen_tcp:connect(Host, Port, Sock_options, Timeout);
  560. _ ->
  561. catch ibrowse_socks5:connect(Host, Port, Options, Sock_options, Timeout)
  562. end.
  563. get_sock_options(Host, Options, SSLOptions) ->
  564. Caller_socket_options = get_value(socket_options, Options, []),
  565. PreferIPv6 = get_value(prefer_ipv6, Options, false),
  566. Ipv6Options = case PreferIPv6 of
  567. true ->
  568. case is_ipv6_host(Host) of
  569. true ->
  570. [inet6];
  571. false ->
  572. []
  573. end;
  574. false ->
  575. []
  576. end,
  577. Other_sock_options = filter_sock_options(SSLOptions ++ Caller_socket_options ++ Ipv6Options),
  578. case lists:keysearch(nodelay, 1, Other_sock_options) of
  579. false ->
  580. [{nodelay, true}, binary, {active, false} | Other_sock_options];
  581. {value, _} ->
  582. [binary, {active, false} | Other_sock_options]
  583. end.
  584. is_ipv6_host(Host) ->
  585. case inet_parse:address(Host) of
  586. {ok, {_, _, _, _, _, _, _, _}} ->
  587. true;
  588. {ok, {_, _, _, _}} ->
  589. false;
  590. _ ->
  591. case inet:gethostbyname(Host, inet6) of
  592. {ok, #hostent{h_addrtype = inet6}} ->
  593. true;
  594. _ ->
  595. false
  596. end
  597. end.
  598. %% We don't want the caller to specify certain options
  599. filter_sock_options(Opts) ->
  600. lists:filter(fun({active, _}) ->
  601. false;
  602. ({packet, _}) ->
  603. false;
  604. (list) ->
  605. false;
  606. (_) ->
  607. true
  608. end, Opts).
  609. do_send(Req, #state{socket = Sock,
  610. is_ssl = true,
  611. use_proxy = true,
  612. proxy_tunnel_setup = Pts}) when Pts /= done -> gen_tcp:send(Sock, Req);
  613. do_send(Req, #state{socket = Sock, is_ssl = true}) -> ssl:send(Sock, Req);
  614. do_send(Req, #state{socket = Sock, is_ssl = false}) -> gen_tcp:send(Sock, Req).
  615. do_send_body(Source, State, TE) when is_function(Source) ->
  616. do_send_body({Source}, State, TE);
  617. do_send_body({Source}, State, TE) when is_function(Source) ->
  618. do_send_body_1(generate_body(Source),
  619. State, TE, []);
  620. do_send_body({Source, Source_state}, State, TE) when is_function(Source) ->
  621. do_send_body_1(generate_body({Source, Source_state}),
  622. State, TE, []);
  623. do_send_body(Body, State, _TE) ->
  624. case do_send(Body, State) of
  625. ok ->
  626. {ok, Body};
  627. Ret ->
  628. Ret
  629. end.
  630. generate_body({Source, Source_state} = In) when is_function(Source) ->
  631. case Source(Source_state) of
  632. {ok, Data, Source_state_1} ->
  633. {{ok, Data, Source_state_1}, Source};
  634. {eof, Source_state_1} ->
  635. {{eof, Source_state_1}, Source};
  636. eof ->
  637. {eof, Source};
  638. Ret ->
  639. {Ret, In}
  640. end;
  641. generate_body(Source) when is_function(Source) ->
  642. {Source(), Source}.
  643. do_send_body_1({Resp, Source}, State, TE, Acc) when is_function(Source) ->
  644. case Resp of
  645. {ok, Data} when Data == []; Data == <<>> ->
  646. do_send_body_1(generate_body(Source), State, TE, Acc);
  647. {ok, Data} ->
  648. Acc_1 = case TE of
  649. true ->
  650. ok = do_send(maybe_chunked_encode(Data, TE), State),
  651. Acc;
  652. false ->
  653. [Data | Acc]
  654. end,
  655. do_send_body_1(generate_body(Source), State, TE, Acc_1);
  656. {ok, Data, New_source_state} when Data == []; Data == <<>> ->
  657. do_send_body_1(generate_body({Source, New_source_state}), State, TE, Acc);
  658. {ok, Data, New_source_state} ->
  659. Acc_1 = case TE of
  660. true ->
  661. ok = do_send(maybe_chunked_encode(Data, TE), State),
  662. Acc;
  663. false ->
  664. [Data | Acc]
  665. end,
  666. do_send_body_1(generate_body({Source, New_source_state}), State, TE, Acc_1);
  667. {eof, _New_source_state} ->
  668. case TE of
  669. true ->
  670. ok = do_send(<<"0\r\n\r\n">>, State),
  671. {ok, []};
  672. _ ->
  673. Body = list_to_binary(lists:reverse(Acc)),
  674. ok = do_send(Body, State),
  675. {ok, Body}
  676. end;
  677. eof when TE == true ->
  678. ok = do_send(<<"0\r\n\r\n">>, State),
  679. {ok, []};
  680. eof ->
  681. Body = list_to_binary(lists:reverse(Acc)),
  682. ok = do_send(Body, State),
  683. {ok, Body};
  684. Err ->
  685. Err
  686. end.
  687. maybe_chunked_encode(Data, false) ->
  688. Data;
  689. maybe_chunked_encode(Data, true) ->
  690. [?dec2hex(iolist_size(Data)), "\r\n", Data, "\r\n"].
  691. do_close(#state{socket = undefined}) -> ok;
  692. do_close(#state{socket = Sock,
  693. is_ssl = true,
  694. use_proxy = true,
  695. proxy_tunnel_setup = Pts
  696. }) when Pts /= done -> catch gen_tcp:close(Sock);
  697. do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
  698. do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
  699. active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
  700. ok;
  701. active_once(#state{socket = Socket} = State) ->
  702. _ = do_setopts(Socket, [{active, once}], State).
  703. do_setopts(_Sock, [], _) -> ok;
  704. do_setopts(Sock, Opts, #state{is_ssl = true,
  705. use_proxy = true,
  706. proxy_tunnel_setup = Pts}
  707. ) when Pts /= done -> inet:setopts(Sock, Opts);
  708. do_setopts(Sock, Opts, #state{is_ssl = true}) -> ssl:setopts(Sock, Opts);
  709. do_setopts(Sock, Opts, _) -> inet:setopts(Sock, Opts).
  710. check_ssl_options(Options, State) ->
  711. case get_value(is_ssl, Options, false) of
  712. false ->
  713. State;
  714. true ->
  715. State#state{is_ssl=true, ssl_options=get_value(ssl_options, Options)}
  716. end.
  717. send_req_1(From,
  718. #url{host = Host,
  719. port = Port} = Url,
  720. Headers, Method, Body, Options, Timeout,
  721. #state{socket = undefined} = State) ->
  722. {Host_1, Port_1, State_1} =
  723. case get_value(proxy_host, Options, false) of
  724. false ->
  725. {Host, Port, State};
  726. PHost ->
  727. ProxyUser = get_value(proxy_user, Options, []),
  728. ProxyPassword = get_value(proxy_password, Options, []),
  729. AuthBasic = http_auth_basic(ProxyUser, ProxyPassword),
  730. {PHost, get_value(proxy_port, Options, 80),
  731. State#state{use_proxy = true,
  732. proxy_auth_basic = AuthBasic}}
  733. end,
  734. State_2 = check_ssl_options(Options, State_1),
  735. do_trace("Connecting...~n", []),
  736. Conn_timeout = get_value(connect_timeout, Options, Timeout),
  737. case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
  738. {ok, Sock} ->
  739. do_trace("Connected! Socket: ~1000.p~n", [Sock]),
  740. State_3 = State_2#state{socket = Sock,
  741. connect_timeout = Conn_timeout},
  742. send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
  743. Err ->
  744. State_3 = State_2#state{proc_state = ?dead_proc_walking},
  745. shutting_down(State_3),
  746. do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
  747. gen_server:reply(From, {error, {conn_failed, Err}}),
  748. delayed_stop_timer(),
  749. {noreply, State_3}
  750. end;
  751. %% Send a CONNECT request.
  752. %% Wait for 200 OK
  753. %% Upgrade to SSL connection
  754. %% Then send request
  755. send_req_1(From,
  756. #url{
  757. host = Server_host,
  758. port = Server_port
  759. } = Url,
  760. Headers, Method, Body, Options, Timeout,
  761. #state{
  762. proxy_tunnel_setup = false,
  763. use_proxy = true,
  764. is_ssl = true} = State) ->
  765. Ref = case Timeout of
  766. infinity ->
  767. undefined;
  768. _ ->
  769. erlang:send_after(Timeout, self(), {req_timedout, From})
  770. end,
  771. NewReq = #request{
  772. method = connect,
  773. preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
  774. options = Options,
  775. timer_ref = Ref
  776. },
  777. State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
  778. Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
  779. Path = [Server_host, $:, integer_to_list(Server_port)],
  780. {Req, Body_1} = make_request(connect, Pxy_auth_headers,
  781. Path, Path,
  782. [], Options, State_1, undefined),
  783. TE = is_chunked_encoding_specified(Options),
  784. trace_request(Req),
  785. case do_send(Req, State) of
  786. ok ->
  787. case do_send_body(Body_1, State_1, TE) of
  788. {ok, _Sent_body} ->
  789. trace_request_body(Body_1),
  790. _ = active_once(State_1),
  791. State_1_1 = inc_pipeline_counter(State_1),
  792. State_2 = State_1_1#state{status = get_header,
  793. cur_req = NewReq,
  794. proxy_tunnel_setup = in_progress,
  795. tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
  796. State_3 = set_inac_timer(State_2),
  797. {noreply, State_3};
  798. Err ->
  799. State_2 = State_1#state{proc_state = ?dead_proc_walking},
  800. shutting_down(State_2),
  801. do_trace("Send failed... Reason: ~p~n", [Err]),
  802. gen_server:reply(From, {error, {send_failed, Err}}),
  803. delayed_stop_timer(),
  804. {noreply, State_2}
  805. end;
  806. Err ->
  807. State_2 = State_1#state{proc_state = ?dead_proc_walking},
  808. shutting_down(State_2),
  809. do_trace("Send failed... Reason: ~p~n", [Err]),
  810. gen_server:reply(From, {error, {send_failed, Err}}),
  811. delayed_stop_timer(),
  812. {noreply, State_2}
  813. end;
  814. send_req_1(From, Url, Headers, Method, Body, Options, Timeout,
  815. #state{proxy_tunnel_setup = in_progress,
  816. tunnel_setup_queue = Q} = State) ->
  817. do_trace("Queued SSL request awaiting tunnel setup: ~n"
  818. "URL : ~s~n"
  819. "Method : ~p~n"
  820. "Headers : ~p~n", [Url, Method, Headers]),
  821. {noreply, State#state{tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout} | Q]}};
  822. send_req_1(From,
  823. #url{abspath = AbsPath,
  824. path = RelPath} = Url,
  825. Headers, Method, Body, Options, Timeout,
  826. #state{status = Status,
  827. socket = Socket} = State) ->
  828. cancel_timer(State#state.inactivity_timer_ref, {eat_message, timeout}),
  829. ReqId = make_req_id(),
  830. Resp_format = get_value(response_format, Options, list),
  831. Caller_socket_options = get_value(socket_options, Options, []),
  832. {StreamTo, Caller_controls_socket} =
  833. case get_value(stream_to, Options, undefined) of
  834. {Caller, once} when is_pid(Caller) or
  835. is_atom(Caller) ->
  836. Async_pid_rec = {{req_id_pid, ReqId}, self()},
  837. true = ets:insert(ibrowse_stream, Async_pid_rec),
  838. {Caller, true};
  839. undefined ->
  840. {undefined, false};
  841. Caller when is_pid(Caller) or
  842. is_atom(Caller) ->
  843. {Caller, false};
  844. Stream_to_inv ->
  845. exit({invalid_option, {stream_to, Stream_to_inv}})
  846. end,
  847. SaveResponseToFile = get_value(save_response_to_file, Options, false),
  848. Ref = case Timeout of
  849. infinity ->
  850. undefined;
  851. _ ->
  852. erlang:send_after(Timeout, self(), {req_timedout, From})
  853. end,
  854. Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State),
  855. {Req, Body_1} = make_request(Method,
  856. Headers_1,
  857. AbsPath, RelPath, Body, Options, State,
  858. ReqId),
  859. NewReq = #request{url = Url,
  860. method = Method,
  861. stream_to = StreamTo,
  862. caller_controls_socket = Caller_controls_socket,
  863. caller_socket_options = Caller_socket_options,
  864. options = Options,
  865. req_id = ReqId,
  866. save_response_to_file = SaveResponseToFile,
  867. stream_full_chunks = get_value(stream_full_chunks, Options, false),
  868. stream_chunk_size = get_stream_chunk_size(Options),
  869. response_format = Resp_format,
  870. from = From,
  871. preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
  872. timer_ref = Ref
  873. },
  874. trace_request(Req),
  875. ok = do_setopts(Socket, Caller_socket_options, State),
  876. TE = is_chunked_encoding_specified(Options),
  877. case do_send(Req, State) of
  878. ok ->
  879. case do_send_body(Body_1, State, TE) of
  880. {ok, Sent_body} ->
  881. trace_request_body(Sent_body),
  882. Raw_req = list_to_binary([Req, Sent_body]),
  883. NewReq_1 = NewReq#request{raw_req = Raw_req},
  884. State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)},
  885. State_2 = inc_pipeline_counter(State_1),
  886. _ = active_once(State_2),
  887. State_3 = case Status of
  888. idle ->
  889. State_2#state{
  890. status = get_header,
  891. cur_req = NewReq_1};
  892. _ ->
  893. State_2
  894. end,
  895. case StreamTo of
  896. undefined ->
  897. ok;
  898. _ ->
  899. gen_server:reply(From, {ibrowse_req_id, ReqId}),
  900. case get_value(return_raw_request, Options, false) of
  901. false ->
  902. ok;
  903. true ->
  904. catch StreamTo ! {ibrowse_async_raw_req, Raw_req}
  905. end
  906. end,
  907. State_4 = set_inac_timer(State_3),
  908. {noreply, State_4};
  909. Err ->
  910. State_2 = State#state{proc_state = ?dead_proc_walking},
  911. shutting_down(State_2),
  912. do_trace("Send failed... Reason: ~p~n", [Err]),
  913. gen_server:reply(From, {error, {send_failed, Err}}),
  914. delayed_stop_timer(),
  915. {noreply, State_2}
  916. end;
  917. Err ->
  918. State_2 = State#state{proc_state = ?dead_proc_walking},
  919. shutting_down(State_2),
  920. do_trace("Send failed... Reason: ~p~n", [Err]),
  921. gen_server:reply(From, {error, {send_failed, Err}}),
  922. delayed_stop_timer(),
  923. {noreply, State_2}
  924. end.
  925. maybe_modify_headers(#url{}, connect, _, Headers, State) ->
  926. add_proxy_auth_headers(State, Headers);
  927. maybe_modify_headers(#url{host = Host, port = Port} = Url,
  928. _Method,
  929. Options, Headers, State) ->
  930. case get_value(headers_as_is, Options, false) of
  931. false ->
  932. Headers_1 = add_auth_headers(Url, Options, Headers, State),
  933. HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
  934. false ->
  935. case Port of
  936. 80 -> Host;
  937. 443 -> Host;
  938. _ -> [Host, ":", integer_to_list(Port)]
  939. end;
  940. {value, {_, Host_h_val}} ->
  941. Host_h_val
  942. end,
  943. [{"Host", HostHeaderValue} | Headers_1];
  944. true ->
  945. Headers
  946. end.
  947. add_auth_headers(#url{username = User,
  948. password = UPw},
  949. Options,
  950. Headers,
  951. State) ->
  952. Headers_1 = case User of
  953. undefined ->
  954. case get_value(basic_auth, Options, undefined) of
  955. undefined ->
  956. Headers;
  957. {U,P} ->
  958. [{"Authorization", ["Basic ", http_auth_basic(U, P)]} | Headers]
  959. end;
  960. _ ->
  961. [{"Authorization", ["Basic ", http_auth_basic(User, UPw)]} | Headers]
  962. end,
  963. add_proxy_auth_headers(State, Headers_1).
  964. add_proxy_auth_headers(#state{use_proxy = false}, Headers) ->
  965. Headers;
  966. add_proxy_auth_headers(#state{proxy_auth_basic = []}, Headers) ->
  967. Headers;
  968. add_proxy_auth_headers(#state{proxy_auth_basic = Auth_basic}, Headers) ->
  969. [{"Proxy-Authorization", ["Basic ", Auth_basic]} | Headers].
  970. http_auth_basic([], []) ->
  971. [];
  972. http_auth_basic(Username, Password) ->
  973. ibrowse_lib:encode_base64(Username ++ [$: | Password]).
  974. make_request(Method, Headers, AbsPath, RelPath, Body, Options,
  975. #state{use_proxy = UseProxy, is_ssl = Is_ssl}, ReqId) ->
  976. HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
  977. Fun1 = fun({X, Y}) when is_atom(X) ->
  978. {to_lower(atom_to_list(X)), X, Y};
  979. ({X, Y}) when is_list(X); is_binary(X) ->
  980. {to_lower(X), X, Y}
  981. end,
  982. Headers_0 = [Fun1(X) || X <- Headers],
  983. Headers_1 =
  984. case lists:keysearch("content-length", 1, Headers_0) of
  985. false when (Body =:= [] orelse Body =:= <<>>) andalso
  986. (Method =:= post orelse Method =:= put) ->
  987. [{"content-length", "Content-Length", "0"} | Headers_0];
  988. false when is_binary(Body) orelse is_list(Body) ->
  989. [{"content-length", "Content-Length", integer_to_list(iolist_size(Body))} | Headers_0];
  990. _ ->
  991. %% Content-Length is already specified or Body is a
  992. %% function or function/state pair
  993. Headers_0
  994. end,
  995. {Headers_2, Body_1} =
  996. case is_chunked_encoding_specified(Options) of
  997. false ->
  998. {[{Y, Z} || {_, Y, Z} <- Headers_1], Body};
  999. true ->
  1000. Chunk_size_1 = case get_value(transfer_encoding, Options) of
  1001. chunked ->
  1002. 5120;
  1003. {chunked, Chunk_size} ->
  1004. Chunk_size
  1005. end,
  1006. {[{Y, Z} || {X, Y, Z} <- Headers_1,
  1007. X /= "content-length"] ++
  1008. [{"Transfer-Encoding", "chunked"}],
  1009. chunk_request_body(Body, Chunk_size_1)}
  1010. end,
  1011. Headers_3 = case lists:member({include_ibrowse_req_id, true}, Options) of
  1012. true ->
  1013. [{"x-ibrowse-request-id", io_lib:format("~1000.p",[ReqId])} | Headers_2];
  1014. false ->
  1015. Headers_2
  1016. end,
  1017. Headers_4 = cons_headers(Headers_3),
  1018. Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
  1019. true ->
  1020. case Is_ssl of
  1021. true ->
  1022. RelPath;
  1023. false ->
  1024. AbsPath
  1025. end;
  1026. false ->
  1027. RelPath
  1028. end,
  1029. {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_4, crnl()], Body_1}.
  1030. is_chunked_encoding_specified(Options) ->
  1031. case get_value(transfer_encoding, Options, false) of
  1032. false ->
  1033. false;
  1034. {chunked, _} ->
  1035. true;
  1036. chunked ->
  1037. true
  1038. end.
  1039. http_vsn_string({0,9}) -> "HTTP/0.9";
  1040. http_vsn_string({1,0}) -> "HTTP/1.0";
  1041. http_vsn_string({1,1}) -> "HTTP/1.1".
  1042. cons_headers(Headers) ->
  1043. cons_headers(Headers, []).
  1044. cons_headers([], Acc) ->
  1045. encode_headers(Acc);
  1046. cons_headers([{basic_auth, {U,P}} | T], Acc) ->
  1047. cons_headers(T, [{"Authorization",
  1048. ["Basic ", ibrowse_lib:encode_base64(U++":"++P)]} | Acc]);
  1049. cons_headers([{cookie, Cookie} | T], Acc) ->
  1050. cons_headers(T, [{"Cookie", Cookie} | Acc]);
  1051. cons_headers([{content_length, L} | T], Acc) ->
  1052. cons_headers(T, [{"Content-Length", L} | Acc]);
  1053. cons_headers([{content_type, L} | T], Acc) ->
  1054. cons_headers(T, [{"Content-Type", L} | Acc]);
  1055. cons_headers([H | T], Acc) ->
  1056. cons_headers(T, [H | Acc]).
  1057. encode_headers(L) ->
  1058. encode_headers(L, []).
  1059. encode_headers([{http_vsn, _Val} | T], Acc) ->
  1060. encode_headers(T, Acc);
  1061. encode_headers([{Name,Val} | T], Acc) when is_list(Name); is_binary(Name) ->
  1062. encode_headers(T, [[Name, ": ", fmt_val(Val), crnl()] | Acc]);
  1063. encode_headers([{Name,Val} | T], Acc) when is_atom(Name) ->
  1064. encode_headers(T, [[atom_to_list(Name), ": ", fmt_val(Val), crnl()] | Acc]);
  1065. encode_headers([], Acc) ->
  1066. lists:reverse(Acc).
  1067. chunk_request_body(Body, _ChunkSize) when is_tuple(Body) orelse
  1068. is_function(Body) ->
  1069. Body;
  1070. chunk_request_body(Body, ChunkSize) ->
  1071. chunk_request_body(Body, ChunkSize, []).
  1072. chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
  1073. LastChunk = "0\r\n",
  1074. lists:reverse(["\r\n", LastChunk | Acc]);
  1075. chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
  1076. size(Body) >= ChunkSize ->
  1077. <<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
  1078. Chunk = [?dec2hex(ChunkSize),"\r\n",
  1079. ChunkBody, "\r\n"],
  1080. chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
  1081. chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
  1082. BodySize = size(Body),
  1083. Chunk = [?dec2hex(BodySize),"\r\n",
  1084. Body, "\r\n"],
  1085. LastChunk = "0\r\n",
  1086. lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
  1087. chunk_request_body(Body, ChunkSize, Acc) when length(Body) >= ChunkSize ->
  1088. {ChunkBody, Rest} = split_list_at(Body, ChunkSize),
  1089. Chunk = [?dec2hex(ChunkSize),"\r\n",
  1090. ChunkBody, "\r\n"],
  1091. chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
  1092. chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
  1093. BodySize = length(Body),
  1094. Chunk = [?dec2hex(BodySize),"\r\n",
  1095. Body, "\r\n"],
  1096. LastChunk = "0\r\n",
  1097. lists:reverse(["\r\n", LastChunk, Chunk | Acc]).
  1098. parse_response(<<>>, #state{cur_req = undefined}=State) ->
  1099. State#state{status = idle};
  1100. parse_response(Data, #state{cur_req = undefined}) ->
  1101. do_trace("Data left to process when no pending request. ~1000.p~n", [Data]),
  1102. {error, data_in_status_idle};
  1103. parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
  1104. cur_req = CurReq} = State) ->
  1105. #request{from=From, stream_to=StreamTo, req_id=ReqId,
  1106. method=Method, response_format = Resp_format,
  1107. options = Options, timer_ref = T_ref,
  1108. raw_req = Raw_req
  1109. } = CurReq,
  1110. MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
  1111. case scan_header(Acc, Data) of
  1112. {yes, Headers, Data_1} ->
  1113. do_trace("Recvd Header Data -> ~s~n----~n", [Headers]),
  1114. do_trace("Recvd headers~n--- Headers Begin ---~n~s~n--- Headers End ---~n~n", [Headers]),
  1115. {HttpVsn, StatCode, Headers_1, Status_line, Raw_headers} = parse_headers(Headers),
  1116. do_trace("HttpVsn: ~p StatusCode: ~p Headers_1 -> ~1000.p~n", [HttpVsn, StatCode, Headers_1]),
  1117. LCHeaders = [{to_lower(X), Y} || {X,Y} <- Headers_1],
  1118. ConnClose = to_lower(get_header_value("connection", LCHeaders, "false")),
  1119. IsClosing = is_connection_closing(HttpVsn, ConnClose),
  1120. State_0 = case IsClosing of
  1121. true ->
  1122. shutting_down(State),
  1123. State#state{is_closing = IsClosing};
  1124. false ->
  1125. State
  1126. end,
  1127. Give_raw_headers = get_value(give_raw_headers, Options, false),
  1128. Give_raw_req = get_value(return_raw_request, Options, false),
  1129. State_1 = case Give_raw_headers of
  1130. true ->
  1131. State_0#state{recvd_headers=Headers_1, status=get_body,
  1132. reply_buffer = <<>>,
  1133. status_line = Status_line,
  1134. raw_headers = Raw_headers,
  1135. http_status_code=StatCode};
  1136. false ->
  1137. State_0#state{recvd_headers=Headers_1, status=get_body,
  1138. status_line = Status_line,
  1139. reply_buffer = <<>>,
  1140. http_status_code=StatCode}
  1141. end,
  1142. put(conn_close, ConnClose),
  1143. TransferEncodings = to_lower(get_header_value("transfer-encoding", LCHeaders, "false")),
  1144. IsChunked = lists:any(fun(Enc) -> string:strip(Enc) =:= "chunked" end,
  1145. string:tokens(TransferEncodings, ",")),
  1146. Head_response_with_body = lists:member({workaround, head_response_with_body}, Options),
  1147. case get_header_value("content-length", LCHeaders, undefined) of
  1148. _ when Method == connect,
  1149. hd(StatCode) == $2 ->
  1150. {_, Reqs_1} = queue:out(Reqs),
  1151. cancel_timer(T_ref),
  1152. upgrade_to_ssl(set_cur_request(State_0#state{reqs = Reqs_1,
  1153. recvd_headers = [],
  1154. status = idle
  1155. }));
  1156. _ when Method == connect ->
  1157. {_, Reqs_1} = queue:out(Reqs),
  1158. do_error_reply(State#state{reqs = Reqs_1},
  1159. {error, proxy_tunnel_failed}),
  1160. {error, proxy_tunnel_failed};
  1161. _ when Method =:= head,
  1162. Head_response_with_body =:= false ->
  1163. %% This (HEAD response with body) is not supposed
  1164. %% to happen, but it does. An Apache server was
  1165. %% observed to send an "empty" body, but in a
  1166. %% Chunked-Transfer-Encoding way, which meant
  1167. %% there was still a body. Issue #67 on Github
  1168. {_, Reqs_1} = queue:out(Reqs),
  1169. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1170. Reply = case Give_raw_req of
  1171. false ->
  1172. {ok, StatCode, Headers_1, []};
  1173. true ->
  1174. {ok, StatCode, Headers_1, [], Raw_req}
  1175. end,
  1176. State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
  1177. cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
  1178. State_2 = reset_state(State_1_1),
  1179. State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
  1180. parse_response(Data_1, State_3);
  1181. _ when hd(StatCode) =:= $1 ->
  1182. %% No message body is expected. Server may send
  1183. %% one or more 1XX responses before a proper
  1184. %% response.
  1185. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1186. do_trace("Recvd a status code of ~p. Ignoring and waiting for a proper response~n", [StatCode]),
  1187. parse_response(Data_1, State_1#state{recvd_headers = [],
  1188. status = get_header});
  1189. _ when StatCode =:= "204";
  1190. StatCode =:= "304" ->
  1191. %% No message body is expected for these Status Codes.
  1192. %% RFC2616 - Sec 4.4
  1193. {_, Reqs_1} = queue:out(Reqs),
  1194. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1195. Reply = case Give_raw_req of
  1196. false ->
  1197. {ok, StatCode, Headers_1, []};
  1198. true ->
  1199. {ok, StatCode, Headers_1, [], Raw_req}
  1200. end,
  1201. State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
  1202. cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
  1203. State_2 = reset_state(State_1_1),
  1204. State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
  1205. parse_response(Data_1, State_3);
  1206. _ when IsChunked ->
  1207. do_trace("Chunked encoding detected...~n",[]),
  1208. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1209. case parse_11_response(Data_1, State_1#state{transfer_encoding=chunked,
  1210. chunk_size=chunk_start,
  1211. reply_buffer = <<>>}) of
  1212. {error, Reason} ->
  1213. fail_pipelined_requests(State_1,
  1214. {error, {Reason,
  1215. {stat_code, StatCode}, Headers_1}}),
  1216. {error, Reason};
  1217. State_2 ->
  1218. State_2
  1219. end;
  1220. undefined when HttpVsn =:= "HTTP/1.0";
  1221. ConnClose =:= "close" ->
  1222. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1223. accumulate_response(Data_1, State_1);
  1224. undefined when StatCode =:= "303" ->
  1225. %% Some servers send 303 requests without a body.
  1226. %% RFC2616 says that they SHOULD, but they dont.
  1227. case ibrowse:get_config_value(allow_303_with_no_body, false) of
  1228. false ->
  1229. fail_pipelined_requests(State_1,
  1230. {error, {content_length_undefined,
  1231. {stat_code, StatCode}, Headers}}),
  1232. {error, content_length_undefined};
  1233. true ->
  1234. {_, Reqs_1} = queue:out(Reqs),
  1235. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1236. State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
  1237. {ok, StatCode, Headers_1, []}),
  1238. cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
  1239. State_2 = reset_state(State_1_1),
  1240. State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
  1241. parse_response(Data_1, State_3)
  1242. end;
  1243. undefined ->
  1244. fail_pipelined_requests(State_1,
  1245. {error, {content_length_undefined,
  1246. {stat_code, StatCode}, Headers}}),
  1247. {error, content_length_undefined};
  1248. V ->
  1249. case catch list_to_integer(V) of
  1250. V_1 when is_integer(V_1), V_1 >= 0 ->
  1251. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1252. do_trace("Recvd Content-Length of ~p~n", [V_1]),
  1253. State_2 = State_1#state{rep_buf_size=0,
  1254. reply_buffer = <<>>,
  1255. content_length=V_1},
  1256. case parse_11_response(Data_1, State_2) of
  1257. {error, Reason} ->
  1258. fail_pipelined_requests(State_1,
  1259. {error, {Reason,
  1260. {stat_code, StatCode}, Headers_1}}),
  1261. {error, Reason};
  1262. State_3 ->
  1263. State_3
  1264. end;
  1265. _ ->
  1266. fail_pipelined_requests(State_1,
  1267. {error, {content_length_undefined,
  1268. {stat_code, StatCode}, Headers}}),
  1269. {error, content_length_undefined}
  1270. end
  1271. end;
  1272. {no, Acc_1} when MaxHeaderSize == infinity ->
  1273. State#state{reply_buffer = Acc_1};
  1274. {no, Acc_1} when size(Acc_1) < MaxHeaderSize ->
  1275. State#state{reply_buffer = Acc_1};
  1276. {no, _Acc_1} ->
  1277. fail_pipelined_requests(State, {error, max_headers_size_exceeded}),
  1278. {error, max_headers_size_exceeded}
  1279. end.
  1280. upgrade_to_ssl(#state{socket = Socket,
  1281. connect_timeout = Conn_timeout,
  1282. ssl_options = Ssl_options,
  1283. tunnel_setup_queue = Q} = State) ->
  1284. case ssl:connect(Socket, Ssl_options, Conn_timeout) of
  1285. {ok, Ssl_socket} ->
  1286. do_trace("Upgraded to SSL socket!!~n", []),
  1287. State_1 = State#state{socket = Ssl_socket,
  1288. proxy_tunnel_setup = done},
  1289. send_queued_requests(lists:reverse(Q), State_1);
  1290. Err ->
  1291. do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
  1292. do_error_reply(State, {error, {send_failed, Err}}),
  1293. {error, send_failed}
  1294. end.
  1295. send_queued_requests([], State) ->
  1296. do_trace("Sent all queued requests via SSL connection~n", []),
  1297. State#state{tunnel_setup_queue = []};
  1298. send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q],
  1299. State) ->
  1300. case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
  1301. {noreply, State_1} ->
  1302. send_queued_requests(Q, State_1);
  1303. Err ->
  1304. do_trace("Error sending queued SSL request: ~n"
  1305. "URL : ~s~n"
  1306. "Method : ~p~n"
  1307. "Headers : ~p~n", [Url, Method, Headers]),
  1308. do_error_reply(State, {error, {send_failed, Err}}),
  1309. {error, send_failed}
  1310. end.
  1311. is_connection_closing("HTTP/0.9", _) -> true;
  1312. is_connection_closing(_, "close") -> true;
  1313. is_connection_closing("HTTP/1.0", "false") -> true;
  1314. is_connection_closing(_, _) -> false.
  1315. %% This clause determines the chunk size when given data from the beginning of the chunk
  1316. parse_11_response(DataRecvd,
  1317. #state{transfer_encoding = chunked,
  1318. chunk_size = chunk_start,
  1319. chunk_size_buffer = Chunk_sz_buf
  1320. } = State) ->
  1321. case scan_crlf(Chunk_sz_buf, DataRecvd) of
  1322. {yes, ChunkHeader, Data_1} ->
  1323. State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
  1324. ChunkSize = parse_chunk_header(ChunkHeader),
  1325. %%
  1326. %% Do we have to preserve the chunk encoding when
  1327. %% streaming? NO. This should be transparent to the client
  1328. %% process. Chunked encoding was only introduced to make
  1329. %% it efficient for the server.
  1330. %%
  1331. RemLen = size(Data_1),
  1332. do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
  1333. [ChunkSize, RemLen]),
  1334. parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
  1335. deleted_crlf = true,
  1336. recvd_chunk_size = 0,
  1337. chunk_size = ChunkSize});
  1338. {no, Data_1} ->
  1339. State#state{chunk_size_buffer = Data_1}
  1340. end;
  1341. %% This clause is to remove the CRLF between two chunks
  1342. %%
  1343. parse_11_response(DataRecvd,
  1344. #state{transfer_encoding = chunked,
  1345. chunk_size = tbd,
  1346. chunk_size_buffer = Buf
  1347. } = State) ->
  1348. case scan_crlf(Buf, DataRecvd) of
  1349. {yes, _, NextChunk} ->
  1350. State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
  1351. State_2 = State_1#state{chunk_size = chunk_start,
  1352. chunk_size_buffer = <<>>,
  1353. deleted_crlf = true},
  1354. parse_11_response(NextChunk, State_2);
  1355. {no, Data_1} ->
  1356. State#state{chunk_size_buffer = Data_1}
  1357. end;
  1358. %% This clause deals with the end of a chunked transfer. ibrowse does
  1359. %% not support Trailers in the Chunked Transfer encoding. Any trailer
  1360. %% received is silently discarded.
  1361. parse_11_response(DataRecvd,
  1362. #state{transfer_encoding = chunked, chunk_size = 0,
  1363. cur_req = CurReq,
  1364. deleted_crlf = DelCrlf,
  1365. chunk_size_buffer = Trailer,
  1366. reqs = Reqs} = State) ->
  1367. do_trace("Detected end of chunked transfer...~n", []),
  1368. DataRecvd_1 = case DelCrlf of
  1369. false ->
  1370. DataRecvd;
  1371. true ->
  1372. <<$\r, $\n, DataRecvd/binary>>
  1373. end,
  1374. case scan_header(Trailer, DataRecvd_1) of
  1375. {yes, TEHeaders, Rem} ->
  1376. {_, Reqs_1} = queue:out(Reqs),
  1377. State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
  1378. State_2 = handle_response(CurReq,
  1379. State_1#state{reqs = Reqs_1}),
  1380. parse_response(Rem, reset_state(State_2));
  1381. {no, Rem} ->
  1382. accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
  1383. end;
  1384. %% This clause extracts a chunk, given the size.
  1385. parse_11_response(DataRecvd,
  1386. #state{transfer_encoding = chunked,
  1387. chunk_size = CSz,
  1388. recvd_chunk_size = Recvd_csz,
  1389. reply_buffer = RepBuf,
  1390. rep_buf_size = RepBufSz,
  1391. streamed_size = Streamed_size,
  1392. cur_req = CurReq} = State) ->
  1393. NeedBytes = CSz - Recvd_csz,
  1394. DataLen = size(DataRecvd),
  1395. do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]),
  1396. case DataLen >= NeedBytes of
  1397. true ->
  1398. {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
  1399. case CurReq of
  1400. #request{stream_to = StreamTo, caller_controls_socket = false, req_id = ReqId, stream_full_chunks = true, response_format = Response_format} ->
  1401. Chunk = <<RepBuf/binary, RemChunk/binary>>,
  1402. do_trace("Recvd another chunk...~p~n", [Chunk]),
  1403. do_trace("RemData -> ~p~n", [RemData]),
  1404. do_interim_reply(StreamTo, Response_format, ReqId, Chunk),
  1405. State_1 = State#state{
  1406. reply_buffer = <<>>,
  1407. rep_buf_size = RepBufSz + size(RemChunk),
  1408. interim_reply_sent = true,
  1409. streamed_size = Streamed_size + CSz,
  1410. chunk_size = tbd,
  1411. recvd_chunk_size = 0},
  1412. parse_11_response(RemData, State_1);
  1413. _ ->
  1414. do_trace("Recvd another chunk...~p~n", [RemChunk]),
  1415. do_trace("RemData -> ~p~n", [RemData]),
  1416. case accumulate_response(RemChunk, State) of
  1417. {error, Reason} ->
  1418. do_trace("Error accumulating response --> ~p~n", [Reason]),
  1419. {error, Reason};
  1420. #state{} = State_1 ->
  1421. State_2 = State_1#state{chunk_size=tbd},
  1422. parse_11_response(RemData, State_2)
  1423. end
  1424. end;
  1425. false ->
  1426. accumulate_response(DataRecvd,
  1427. State#state{rep_buf_size = RepBufSz + DataLen,
  1428. recvd_chunk_size = Recvd_csz + DataLen})
  1429. end;
  1430. %% This clause to extract the body when Content-Length is specified
  1431. parse_11_response(DataRecvd,
  1432. #state{content_length=CL, rep_buf_size=RepBufSz,
  1433. reqs=Reqs}=State) ->
  1434. NeedBytes = CL - RepBufSz,
  1435. DataLen = size(DataRecvd),
  1436. case DataLen >= NeedBytes of
  1437. true ->
  1438. {RemBody, Rem} = split_binary(DataRecvd, NeedBytes),
  1439. {_, Reqs_1} = queue:out(Reqs),
  1440. State_1 = accumulate_response(RemBody, State),
  1441. State_2 = handle_response(State_1#state.cur_req, State_1#state{reqs=Reqs_1}),
  1442. State_3 = reset_state(State_2),
  1443. parse_response(Rem, State_3);
  1444. false ->
  1445. accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
  1446. end.
  1447. maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
  1448. State;
  1449. maybe_accumulate_ce_data(State, Data) ->
  1450. accumulate_response(Data, State).
  1451. handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
  1452. response_format = Resp_format,
  1453. save_response_to_file = SaveResponseToFile,
  1454. tmp_file_name = TmpFilename,
  1455. tmp_file_fd = Fd,
  1456. options = Options,
  1457. timer_ref = ReqTimer,
  1458. raw_req = Raw_req
  1459. },
  1460. #state{http_status_code = SCode,
  1461. status_line = Status_line,
  1462. raw_headers = Raw_headers,
  1463. reply_buffer = RepBuf,
  1464. recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
  1465. Body = RepBuf,
  1466. case Fd of
  1467. undefined ->
  1468. ok;
  1469. _ ->
  1470. ok = file:close(Fd)
  1471. end,
  1472. ResponseBody = case TmpFilename of
  1473. undefined ->
  1474. Body;
  1475. _ ->
  1476. {file, TmpFilename}
  1477. end,
  1478. {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, RespHeaders, Raw_headers, Options),
  1479. Give_raw_req = get_value(return_raw_request, Options, false),
  1480. Reply = case get_value(give_raw_headers, Options, false) of
  1481. true when Give_raw_req == false ->
  1482. {ok, Status_line, Raw_headers_1, ResponseBody};
  1483. true ->
  1484. {ok, Status_line, Raw_headers_1, ResponseBody, Raw_req};
  1485. false when Give_raw_req == false ->
  1486. {ok, SCode, Resp_headers_1, ResponseBody};
  1487. false ->
  1488. {ok, SCode, Resp_headers_1, ResponseBody, Raw_req}
  1489. end,
  1490. State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  1491. cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
  1492. set_cur_request(State_1);
  1493. handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
  1494. response_format = Resp_format,
  1495. options = Options, timer_ref = ReqTimer,
  1496. raw_req = Raw_req
  1497. },
  1498. #state{http_status_code = SCode,
  1499. status_line = Status_line,
  1500. raw_headers = Raw_headers,
  1501. recvd_headers = Resp_headers,
  1502. reply_buffer = RepBuf
  1503. } = State) ->
  1504. Body = RepBuf,
  1505. {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Resp_headers, Raw_headers, Options),
  1506. Give_raw_req = get_value(return_raw_request, Options, false),
  1507. Reply = case get_value(give_raw_headers, Options, false) of
  1508. true when Give_raw_req == false ->
  1509. {ok, Status_line, Raw_headers_1, Body};
  1510. true ->
  1511. {ok, Status_line, Raw_headers_1, Body, Raw_req};
  1512. false when Give_raw_req == false ->
  1513. {ok, SCode, Resp_headers_1, Body};
  1514. false ->
  1515. {ok, SCode, Resp_headers_1, Body, Raw_req}
  1516. end,
  1517. State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  1518. cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
  1519. set_cur_request(State_1).
  1520. reset_state(State) ->
  1521. State#state{status = get_header,
  1522. rep_buf_size = 0,
  1523. streamed_size = 0,
  1524. content_length = undefined,
  1525. reply_buffer = <<>>,
  1526. chunk_size_buffer = <<>>,
  1527. recvd_headers = [],
  1528. status_line = undefined,
  1529. raw_headers = undefined,
  1530. deleted_crlf = false,
  1531. http_status_code = undefined,
  1532. chunk_size = undefined,
  1533. transfer_encoding = undefined
  1534. }.
  1535. set_cur_request(#state{reqs = Reqs, socket = Socket} = State) ->
  1536. case queue:peek(Reqs) of
  1537. empty ->
  1538. State#state{cur_req = undefined};
  1539. {value, #request{caller_controls_socket = Ccs} = NextReq} ->
  1540. _ = Ccs =:= true
  1541. andalso do_setopts(Socket, [{active, once}], State),
  1542. State#state{cur_req = NextReq}
  1543. end.
  1544. parse_headers(Headers) ->
  1545. case scan_crlf(Headers) of
  1546. {yes, StatusLine, T} ->
  1547. parse_headers(StatusLine, T);
  1548. {no, StatusLine} ->
  1549. parse_headers(StatusLine, <<>>)
  1550. end.
  1551. parse_headers(StatusLine, Headers) ->
  1552. Headers_1 = parse_headers_1(Headers),
  1553. case parse_status_line(StatusLine) of
  1554. {ok, HttpVsn, StatCode, _Msg} ->
  1555. put(http_prot_vsn, HttpVsn),
  1556. {HttpVsn, StatCode, Headers_1, StatusLine, Headers};
  1557. _ -> %% A HTTP 0.9 response?
  1558. put(http_prot_vsn, "HTTP/0.9"),
  1559. {"HTTP/0.9", undefined, Headers, StatusLine, Headers}
  1560. end.
  1561. % From RFC 2616
  1562. %
  1563. % HTTP/1.1 header field values can be folded onto multiple lines if
  1564. % the continuation line begins with a space or horizontal tab. All
  1565. % linear white space, including folding, has the same semantics as
  1566. % SP. A recipient MAY replace any linear white space with a single
  1567. % SP before interpreting the field value or forwarding the message
  1568. % downstream.
  1569. parse_headers_1(B) when is_binary(B) ->
  1570. parse_headers_1(binary_to_list(B));
  1571. parse_headers_1(String) ->
  1572. parse_headers_1(String, [], []).
  1573. parse_headers_1([$\n, H |T], [$\r | L], Acc) when H =:= 32;
  1574. H =:= $\t ->
  1575. parse_headers_1(lists:dropwhile(fun(X) ->
  1576. is_whitespace(X)
  1577. end, T), [32 | L], Acc);
  1578. parse_headers_1([$\n, H |T], L, Acc) when H =:= 32;
  1579. H =:= $\t ->
  1580. parse_headers_1(lists:dropwhile(fun(X) ->
  1581. is_whitespace(X)
  1582. end, T), [32 | L], Acc);
  1583. parse_headers_1([$\n|T], [$\r | L], Acc) ->
  1584. case parse_header(lists:reverse(L)) of
  1585. invalid ->
  1586. parse_headers_1(T, [], Acc);
  1587. NewHeader ->
  1588. parse_headers_1(T, [], [NewHeader | Acc])
  1589. end;
  1590. parse_headers_1([$\n|T], L, Acc) ->
  1591. case parse_header(lists:reverse(L)) of
  1592. invalid ->
  1593. parse_headers_1(T, [], Acc);
  1594. NewHeader ->
  1595. parse_headers_1(T, [], [NewHeader | Acc])
  1596. end;
  1597. parse_headers_1([H|T], L, Acc) ->
  1598. parse_headers_1(T, [H|L], Acc);
  1599. parse_headers_1([], [], Acc) ->
  1600. lists:reverse(Acc);
  1601. parse_headers_1([], L, Acc) ->
  1602. Acc_1 = case parse_header(lists:reverse(L)) of
  1603. invalid ->
  1604. Acc;
  1605. NewHeader ->
  1606. [NewHeader | Acc]
  1607. end,
  1608. lists:reverse(Acc_1).
  1609. parse_status_line(Line) when is_binary(Line) ->
  1610. parse_status_line(binary_to_list(Line));
  1611. parse_status_line(Line) ->
  1612. parse_status_line(Line, get_prot_vsn, [], []).
  1613. parse_status_line([32 | T], get_prot_vsn, ProtVsn, StatCode) ->
  1614. parse_status_line(T, get_status_code, ProtVsn, StatCode);
  1615. parse_status_line([32 | T], get_status_code, ProtVsn, StatCode) ->
  1616. {ok, lists:reverse(ProtVsn), lists:reverse(StatCode), T};
  1617. parse_status_line([], get_status_code, ProtVsn, StatCode) ->
  1618. {ok, lists:reverse(ProtVsn), lists:reverse(StatCode), []};
  1619. parse_status_line([H | T], get_prot_vsn, ProtVsn, StatCode) ->
  1620. parse_status_line(T, get_prot_vsn, [H|ProtVsn], StatCode);
  1621. parse_status_line([H | T], get_status_code, ProtVsn, StatCode) ->
  1622. parse_status_line(T, get_status_code, ProtVsn, [H | StatCode]);
  1623. parse_status_line([], _, _, _) ->
  1624. http_09.
  1625. parse_header(L) ->
  1626. parse_header(L, []).
  1627. parse_header([$: | V], Acc) ->
  1628. {lists:reverse(Acc), string:strip(V)};
  1629. parse_header([H | T], Acc) ->
  1630. parse_header(T, [H | Acc]);
  1631. parse_header([], _) ->
  1632. invalid.
  1633. scan_header(Bin) ->
  1634. case get_crlf_crlf_pos(Bin, 0) of
  1635. {yes, Pos} ->
  1636. {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos),
  1637. {yes, Headers, Body};
  1638. {yes_dodgy, Pos} ->
  1639. {Headers, <<_:2/binary, Body/binary>>} = split_binary(Bin, Pos),
  1640. {yes, Headers, Body};
  1641. no ->
  1642. {no, Bin}
  1643. end.
  1644. scan_header(Bin1, Bin2) when size(Bin1) < 4 ->
  1645. scan_header(<<Bin1/binary, Bin2/binary>>);
  1646. scan_header(Bin1, <<>>) ->
  1647. scan_header(Bin1);
  1648. scan_header(Bin1, Bin2) ->
  1649. Bin1_already_scanned_size = size(Bin1) - 4,
  1650. <<Headers_prefix:Bin1_already_scanned_size/binary, Rest/binary>> = Bin1,
  1651. Bin_to_scan = <<Rest/binary, Bin2/binary>>,
  1652. case get_crlf_crlf_pos(Bin_to_scan, 0) of
  1653. {yes, Pos} ->
  1654. {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
  1655. {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
  1656. {yes_dodgy, Pos} ->
  1657. {Headers_suffix, <<_:2/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
  1658. {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
  1659. no ->
  1660. {no, <<Bin1/binary, Bin2/binary>>}
  1661. end.
  1662. get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos};
  1663. get_crlf_crlf_pos(<<$\n, $\n, _/binary>>, Pos) -> {yes_dodgy, Pos};
  1664. get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1);
  1665. get_crlf_crlf_pos(<<>>, _) -> no.
  1666. scan_crlf(Bin) ->
  1667. case get_crlf_pos(Bin) of
  1668. {yes, Offset, Pos} ->
  1669. {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin, Pos),
  1670. {yes, Prefix, Suffix};
  1671. no ->
  1672. {no, Bin}
  1673. end.
  1674. scan_crlf(<<>>, Bin2) ->
  1675. scan_crlf(Bin2);
  1676. scan_crlf(Bin1, Bin2) when size(Bin1) < 2 ->
  1677. scan_crlf(<<Bin1/binary, Bin2/binary>>);
  1678. scan_crlf(Bin1, Bin2) ->
  1679. scan_crlf_1(size(Bin1) - 2, Bin1, Bin2).
  1680. scan_crlf_1(Bin1_head_size, Bin1, Bin2) ->
  1681. <<Bin1_head:Bin1_head_size/binary, Bin1_tail/binary>> = Bin1,
  1682. Bin3 = <<Bin1_tail/binary, Bin2/binary>>,
  1683. case get_crlf_pos(Bin3) of
  1684. {yes, Offset, Pos} ->
  1685. {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin3, Pos),
  1686. {yes, list_to_binary([Bin1_head, Prefix]), Suffix};
  1687. no ->
  1688. {no, list_to_binary([Bin1, Bin2])}
  1689. end.
  1690. get_crlf_pos(Bin) ->
  1691. get_crlf_pos(Bin, 0).
  1692. get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, 2, Pos};
  1693. get_crlf_pos(<<$\n, _/binary>>, Pos) -> {yes, 1, Pos};
  1694. get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1);
  1695. get_crlf_pos(<<>>, _) -> no.
  1696. fmt_val(L) when is_list(L) -> L;
  1697. fmt_val(I) when is_integer(I) -> integer_to_list(I);
  1698. fmt_val(A) when is_atom(A) -> atom_to_list(A);
  1699. fmt_val(B) when is_binary(B) -> B;
  1700. fmt_val(Term) -> io_lib:format("~p", [Term]).
  1701. crnl() -> "\r\n".
  1702. method(connect) -> "CONNECT";
  1703. method(delete) -> "DELETE";
  1704. method(get) -> "GET";
  1705. method(head) -> "HEAD";
  1706. method(options) -> "OPTIONS";
  1707. method(post) -> "POST";
  1708. method(put) -> "PUT";
  1709. method(trace) -> "TRACE";
  1710. %% webdav
  1711. method(copy) -> "COPY";
  1712. method(lock) -> "LOCK";
  1713. method(mkcol) -> "MKCOL";
  1714. method(move) -> "MOVE";
  1715. method(propfind) -> "PROPFIND";
  1716. method(proppatch) -> "PROPPATCH";
  1717. method(search) -> "SEARCH";
  1718. method(unlock) -> "UNLOCK";
  1719. %% subversion %%
  1720. method(report) -> "REPORT";
  1721. method(mkactivity) -> "MKACTIVITY";
  1722. method(checkout) -> "CHECKOUT";
  1723. method(merge) -> "MERGE";
  1724. %% upnp
  1725. method(msearch) -> "MSEARCH";
  1726. method(notify) -> "NOTIFY";
  1727. method(subscribe) -> "SUBSCRIBE";
  1728. method(unsubscribe) -> "UNSUBSCRIBE";
  1729. %% rfc-5789
  1730. method(patch) -> "PATCH";
  1731. method(purge) -> "PURGE".
  1732. %% From RFC 2616
  1733. %%
  1734. % The chunked encoding modifies the body of a message in order to
  1735. % transfer it as a series of chunks, each with its own size indicator,
  1736. % followed by an OPTIONAL trailer containing entity-header
  1737. % fields. This allows dynamically produced content to be transferred
  1738. % along with the information necessary for the recipient to verify
  1739. % that it has received the full message.
  1740. % Chunked-Body = *chunk
  1741. % last-chunk
  1742. % trailer
  1743. % CRLF
  1744. % chunk = chunk-size [ chunk-extension ] CRLF
  1745. % chunk-data CRLF
  1746. % chunk-size = 1*HEX
  1747. % last-chunk = 1*("0") [ chunk-extension ] CRLF
  1748. % chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
  1749. % chunk-ext-name = token
  1750. % chunk-ext-val = token | quoted-string
  1751. % chunk-data = chunk-size(OCTET)
  1752. % trailer = *(entity-header CRLF)
  1753. % The chunk-size field is a string of hex digits indicating the size
  1754. % of the chunk. The chunked encoding is ended by any chunk whose size
  1755. % is zero, followed by the trailer, which is terminated by an empty
  1756. % line.
  1757. %%
  1758. %% The parsing implemented here discards all chunk extensions. It also
  1759. %% strips trailing spaces from the chunk size fields as Apache 1.3.27 was
  1760. %% sending them.
  1761. parse_chunk_header(ChunkHeader) ->
  1762. parse_chunk_header(ChunkHeader, []).
  1763. parse_chunk_header(<<$;, _/binary>>, Acc) ->
  1764. hexlist_to_integer(lists:reverse(Acc));
  1765. parse_chunk_header(<<H, T/binary>>, Acc) ->
  1766. case is_whitespace(H) of
  1767. true ->
  1768. parse_chunk_header(T, Acc);
  1769. false ->
  1770. parse_chunk_header(T, [H | Acc])
  1771. end;
  1772. parse_chunk_header(<<>>, Acc) ->
  1773. hexlist_to_integer(lists:reverse(Acc)).
  1774. is_whitespace($\s) -> true;
  1775. is_whitespace($\r) -> true;
  1776. is_whitespace($\n) -> true;
  1777. is_whitespace($\t) -> true;
  1778. is_whitespace(_) -> false.
  1779. send_async_headers(_ReqId, undefined, _, _State) ->
  1780. ok;
  1781. send_async_headers(ReqId, StreamTo, Give_raw_headers,
  1782. #state{status_line = Status_line, raw_headers = Raw_headers,
  1783. recvd_headers = Headers, http_status_code = StatCode,
  1784. cur_req = #request{options = Opts}
  1785. }) ->
  1786. {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts),
  1787. case Give_raw_headers of
  1788. false ->
  1789. catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
  1790. true ->
  1791. catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
  1792. end.
  1793. maybe_add_custom_headers(Status_line, Headers, Raw_headers, Opts) ->
  1794. Custom_headers = get_value(add_custom_headers, Opts, []),
  1795. Headers_1 = Headers ++ Custom_headers,
  1796. Raw_headers_1 = case Custom_headers of
  1797. [_ | _] when is_binary(Raw_headers) ->
  1798. Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
  1799. <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
  1800. _ ->
  1801. Raw_headers
  1802. end,
  1803. case get_value(preserve_status_line, Opts, false) of
  1804. true ->
  1805. {[{ibrowse_status_line, Status_line} | Headers_1], Raw_headers_1};
  1806. false ->
  1807. {Headers_1, Raw_headers_1}
  1808. end.
  1809. format_response_data(Resp_format, Body) ->
  1810. case Resp_format of
  1811. list when is_list(Body) ->
  1812. flatten(Body);
  1813. list when is_binary(Body) ->
  1814. binary_to_list(Body);
  1815. binary when is_list(Body) ->
  1816. list_to_binary(Body);
  1817. _ ->
  1818. %% This is to cater for sending messages such as
  1819. %% {chunk_start, _}, chunk_end etc
  1820. Body
  1821. end.
  1822. %% dont message an unexisting server
  1823. %% triggered by :stop or :tcp_closed on an unactive connection
  1824. do_reply(State, undefined, undefined, _, _, _Msg) ->
  1825. dec_pipeline_counter(State);
  1826. do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
  1827. Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
  1828. gen_server:reply(From, Msg_1),
  1829. dec_pipeline_counter(State);
  1830. do_reply(State, From, undefined, _, _, Msg) ->
  1831. gen_server:reply(From, Msg),
  1832. dec_pipeline_counter(State);
  1833. do_reply(#state{prev_req_id = Prev_req_id} = State,
  1834. _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
  1835. State_1 = dec_pipeline_counter(State),
  1836. case Body of
  1837. [] ->
  1838. ok;
  1839. _ ->
  1840. Body_1 = format_response_data(Resp_format, Body),
  1841. catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
  1842. end,
  1843. catch StreamTo ! {ibrowse_async_response_end, ReqId},
  1844. %% We don't want to delete the Req-id to Pid mapping straightaway
  1845. %% as the client may send a stream_next message just while we are
  1846. %% sending back this ibrowse_async_response_end message. If we
  1847. %% deleted this mapping straightaway, the caller will see a
  1848. %% {error, unknown_req_id} when it calls ibrowse:stream_next/1. To
  1849. %% get around this, we store the req id, and clear it after the
  1850. %% next request. If there are wierd combinations of stream,
  1851. %% stream_once and sync requests on the same connection, it will
  1852. %% take a while for the req_id-pid mapping to get cleared, but it
  1853. %% should do no harm.
  1854. ets:delete(ibrowse_stream, {req_id_pid, Prev_req_id}),
  1855. State_1#state{prev_req_id = ReqId};
  1856. do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
  1857. State_1 = dec_pipeline_counter(State),
  1858. Msg_1 = format_response_data(Resp_format, Msg),
  1859. catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
  1860. State_1.
  1861. do_interim_reply(undefined, _, _ReqId, _Msg) ->
  1862. ok;
  1863. do_interim_reply(StreamTo, Response_format, ReqId, Msg) ->
  1864. Msg_1 = format_response_data(Response_format, Msg),
  1865. catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}.
  1866. do_error_reply(#state{reqs = Reqs, tunnel_setup_queue = Tun_q} = State, Err) ->
  1867. ReqList = queue:to_list(Reqs),
  1868. lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
  1869. response_format = Resp_format}) ->
  1870. ets:delete(ibrowse_stream, {req_id_pid, ReqId}),
  1871. do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
  1872. end, ReqList),
  1873. lists:foreach(
  1874. fun({From, _Url, _Headers, _Method, _Body, _Options, _Timeout}) ->
  1875. do_reply(State, From, undefined, undefined, undefined, Err)
  1876. end, Tun_q).
  1877. fail_pipelined_requests(#state{reqs = Reqs, cur_req = CurReq} = State, Reply) ->
  1878. {_, Reqs_1} = queue:out(Reqs),
  1879. #request{from=From, stream_to=StreamTo, req_id=ReqId,
  1880. response_format = Resp_format} = CurReq,
  1881. State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  1882. do_error_reply(State_1#state{reqs = Reqs_1}, previous_request_failed).
  1883. split_list_at(List, N) ->
  1884. split_list_at(List, N, []).
  1885. split_list_at([], _, Acc) ->
  1886. {lists:reverse(Acc), []};
  1887. split_list_at(List2, 0, List1) ->
  1888. {lists:reverse(List1), List2};
  1889. split_list_at([H | List2], N, List1) ->
  1890. split_list_at(List2, N-1, [H | List1]).
  1891. hexlist_to_integer(List) ->
  1892. hexlist_to_integer(lists:reverse(List), 1, 0).
  1893. hexlist_to_integer([H | T], Multiplier, Acc) ->
  1894. hexlist_to_integer(T, Multiplier*16, Multiplier*to_ascii(H) + Acc);
  1895. hexlist_to_integer([], _, Acc) ->
  1896. Acc.
  1897. to_ascii($A) -> 10;
  1898. to_ascii($a) -> 10;
  1899. to_ascii($B) -> 11;
  1900. to_ascii($b) -> 11;
  1901. to_ascii($C) -> 12;
  1902. to_ascii($c) -> 12;
  1903. to_ascii($D) -> 13;
  1904. to_ascii($d) -> 13;
  1905. to_ascii($E) -> 14;
  1906. to_ascii($e) -> 14;
  1907. to_ascii($F) -> 15;
  1908. to_ascii($f) -> 15;
  1909. to_ascii($1) -> 1;
  1910. to_ascii($2) -> 2;
  1911. to_ascii($3) -> 3;
  1912. to_ascii($4) -> 4;
  1913. to_ascii($5) -> 5;
  1914. to_ascii($6) -> 6;
  1915. to_ascii($7) -> 7;
  1916. to_ascii($8) -> 8;
  1917. to_ascii($9) -> 9;
  1918. to_ascii($0) -> 0.
  1919. cancel_timer(undefined) -> ok;
  1920. cancel_timer(Ref) -> _ = erlang:cancel_timer(Ref),
  1921. ok.
  1922. cancel_timer(Ref, {eat_message, Msg}) ->
  1923. cancel_timer(Ref),
  1924. receive
  1925. Msg ->
  1926. ok
  1927. after 0 ->
  1928. ok
  1929. end.
  1930. make_req_id() ->
  1931. generate_timestamp().
  1932. to_lower(Str) when is_binary(Str) ->
  1933. to_lower(binary_to_list(Str));
  1934. to_lower(Str) ->
  1935. to_lower(Str, []).
  1936. to_lower([H|T], Acc) when H >= $A, H =< $Z ->
  1937. to_lower(T, [H+32|Acc]);
  1938. to_lower([H|T], Acc) ->
  1939. to_lower(T, [H|Acc]);
  1940. to_lower([], Acc) ->
  1941. lists:reverse(Acc).
  1942. shutting_down(#state{lb_ets_tid = undefined}) ->
  1943. ok;
  1944. shutting_down(#state{lb_ets_tid = Tid,
  1945. cur_pipeline_size = _Sz}) ->
  1946. (catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])).
  1947. inc_pipeline_counter(#state{is_closing = true} = State) ->
  1948. State;
  1949. inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
  1950. State;
  1951. inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
  1952. State#state{cur_pipeline_size = Pipe_sz + 1}.
  1953. dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
  1954. lb_ets_tid = Tid,
  1955. proc_state = Proc_state} = State) when Tid /= undefined,
  1956. Proc_state /= ?dead_proc_walking ->
  1957. Ts = os:timestamp(),
  1958. catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
  1959. (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
  1960. [{'==', '$1', {const,self()}},
  1961. {'<', '$2', {const,Ts}}
  1962. ],
  1963. [true]}])),
  1964. State#state{cur_pipeline_size = Pipe_sz - 1};
  1965. dec_pipeline_counter(State) ->
  1966. State.
  1967. flatten([H | _] = L) when is_integer(H) ->
  1968. L;
  1969. flatten([H | _] = L) when is_list(H) ->
  1970. lists:flatten(L);
  1971. flatten([]) ->
  1972. [].
  1973. get_stream_chunk_size(Options) ->
  1974. case get_value(stream_full_chunks, Options, false) of
  1975. true ->
  1976. infinity;
  1977. _ ->
  1978. case lists:keysearch(stream_chunk_size, 1, Options) of
  1979. {value, {_, V}} when V > 0 ->
  1980. V;
  1981. _ ->
  1982. ?DEFAULT_STREAM_CHUNK_SIZE
  1983. end
  1984. end.
  1985. set_inac_timer(State) ->
  1986. cancel_timer(State#state.inactivity_timer_ref),
  1987. set_inac_timer(State#state{inactivity_timer_ref = undefined},
  1988. get_inac_timeout(State)).
  1989. set_inac_timer(State, Timeout) when is_integer(Timeout) ->
  1990. Ref = erlang:send_after(Timeout, self(), timeout),
  1991. State#state{inactivity_timer_ref = Ref};
  1992. set_inac_timer(State, _) ->
  1993. State.
  1994. get_inac_timeout(#state{cur_req = #request{options = Opts}}) ->
  1995. get_value(inactivity_timeout, Opts, infinity);
  1996. get_inac_timeout(#state{cur_req = undefined}) ->
  1997. case ibrowse:get_config_value(inactivity_timeout, undefined) of
  1998. Val when is_integer(Val) ->
  1999. Val;
  2000. _ ->
  2001. case application:get_env(ibrowse, inactivity_timeout) of
  2002. {ok, Val} when is_integer(Val), Val > 0 ->
  2003. Val;
  2004. _ ->
  2005. 10000
  2006. end
  2007. end.
  2008. trace_request(Req) ->
  2009. case get(my_trace_flag) of
  2010. true ->
  2011. %%Avoid the binary operations if trace is not on...
  2012. NReq = to_binary(Req),
  2013. do_trace("Sending request: ~n"
  2014. "--- Request Begin ---~n~s~n"
  2015. "--- Request End ---~n", [NReq]);
  2016. _ -> ok
  2017. end.
  2018. trace_request_body(Body) ->
  2019. case get(my_trace_flag) of
  2020. true ->
  2021. %%Avoid the binary operations if trace is not on...
  2022. NBody = to_binary(Body),
  2023. case size(NBody) > 1024 of
  2024. true ->
  2025. ok;
  2026. false ->
  2027. do_trace("Sending request body: ~n"
  2028. "--- Request Body Begin ---~n~s~n"
  2029. "--- Request Body End ---~n", [NBody])
  2030. end;
  2031. false ->
  2032. ok
  2033. end.
  2034. to_binary({X, _}) when is_function(X) -> to_binary(X);
  2035. to_binary(X) when is_function(X) -> <<"body generated by function">>;
  2036. to_binary(X) when is_list(X) -> list_to_binary(X);
  2037. to_binary(X) when is_binary(X) -> X.
  2038. get_header_value(Name, Headers, Default_val) ->
  2039. case lists:keysearch(Name, 1, Headers) of
  2040. false ->
  2041. Default_val;
  2042. {value, {_, Val}} when is_binary(Val) ->
  2043. binary_to_list(Val);
  2044. {value, {_, Val}} ->
  2045. Val
  2046. end.
  2047. delayed_stop_timer() ->
  2048. erlang:send_after(500, self(), delayed_stop).
  2049. maybe_trap_exits() ->
  2050. case ibrowse:get_config_value(worker_trap_exits, true) of
  2051. true -> process_flag(trap_exit, true);
  2052. false -> ok
  2053. end.