You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1823 line
74 KiB

14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
Use Erlang's OTP base64 module (available since R12B02) and avoid duplicated base64 encoding/decoding code in ibrowse_lib.erl and ibrowse_http_client.erl. OTP's base64 module is also more efficient (C implementation): 1> Data = crypto:rand_bytes(4096). <<205,174,13,169,97,159,110,161,71,43,226,153,42,101,243, 83,11,96,23,161,253,251,129,240,163,216,58,175,190,...>> 2> 2> timer:tc(ibrowse_lib, encode_base64, [Data]). {2920, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 3> timer:tc(ibrowse_lib, encode_base64, [Data]). {1221, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 4> timer:tc(ibrowse_lib, encode_base64, [Data]). {1436, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 5> timer:tc(ibrowse_lib, encode_base64, [Data]). {1195, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 6> 6> timer:tc(base64, encode, [Data]). {1846, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 7> timer:tc(base64, encode, [Data]). {743, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 8> timer:tc(base64, encode, [Data]). {737, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>} 9> timer:tc(base64, encode, [Data]). {656, <<"za4NqWGfbqFHK+KZKmXzUwtgF6H9+4Hwo9g6r77h2EF1/Xk1oKOIOmnAkgtv41LPXg37fp2dlr45C8qCA9/8zrcc9F5zr2JT0eVPTrh5aahl"...>>}
15 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
14 年之前
  1. %%%-------------------------------------------------------------------
  2. %%% File : ibrowse_http_client.erl
  3. %%% Author : Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  4. %%% Description : The name says it all
  5. %%%
  6. %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
  7. %%%-------------------------------------------------------------------
  8. -module(ibrowse_http_client).
  9. -behaviour(gen_server).
  10. %%--------------------------------------------------------------------
  11. %% Include files
  12. %%--------------------------------------------------------------------
  13. %%--------------------------------------------------------------------
  14. %% External exports
  15. -export([
  16. start_link/1,
  17. start/1,
  18. stop/1,
  19. send_req/7
  20. ]).
  21. -ifdef(debug).
  22. -compile(export_all).
  23. -endif.
  24. %% gen_server callbacks
  25. -export([
  26. init/1,
  27. handle_call/3,
  28. handle_cast/2,
  29. handle_info/2,
  30. terminate/2,
  31. code_change/3
  32. ]).
  33. -include("ibrowse.hrl").
  34. -record(state, {host, port, connect_timeout,
  35. inactivity_timer_ref,
  36. use_proxy = false, proxy_auth_digest,
  37. ssl_options = [], is_ssl = false, socket,
  38. proxy_tunnel_setup = false,
  39. tunnel_setup_queue = [],
  40. reqs=queue:new(), cur_req, status=idle, http_status_code,
  41. reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0,
  42. recvd_headers=[],
  43. status_line, raw_headers,
  44. is_closing, send_timer, content_length,
  45. deleted_crlf = false, transfer_encoding,
  46. chunk_size, chunk_size_buffer = <<>>,
  47. recvd_chunk_size, interim_reply_sent = false,
  48. lb_ets_tid, cur_pipeline_size = 0, prev_req_id
  49. }).
  50. -record(request, {url, method, options, from,
  51. stream_to, caller_controls_socket = false,
  52. caller_socket_options = [],
  53. req_id,
  54. stream_chunk_size,
  55. save_response_to_file = false,
  56. tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
  57. response_format}).
  58. -import(ibrowse_lib, [
  59. get_value/2,
  60. get_value/3,
  61. do_trace/2
  62. ]).
  63. -define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
  64. -define(dec2hex(X), erlang:integer_to_list(X, 16)).
  65. %%====================================================================
  66. %% External functions
  67. %%====================================================================
  68. %%--------------------------------------------------------------------
  69. %% Function: start_link/0
  70. %% Description: Starts the server
  71. %%--------------------------------------------------------------------
  72. start(Args) ->
  73. gen_server:start(?MODULE, Args, []).
  74. start_link(Args) ->
  75. gen_server:start_link(?MODULE, Args, []).
  76. stop(Conn_pid) ->
  77. case catch gen_server:call(Conn_pid, stop) of
  78. {'EXIT', {timeout, _}} ->
  79. exit(Conn_pid, kill),
  80. ok;
  81. _ ->
  82. ok
  83. end.
  84. send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
  85. gen_server:call(
  86. Conn_Pid,
  87. {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout).
  88. %%====================================================================
  89. %% Server functions
  90. %%====================================================================
  91. %%--------------------------------------------------------------------
  92. %% Function: init/1
  93. %% Description: Initiates the server
  94. %% Returns: {ok, State} |
  95. %% {ok, State, Timeout} |
  96. %% ignore |
  97. %% {stop, Reason}
  98. %%--------------------------------------------------------------------
  99. init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
  100. State = #state{host = Host,
  101. port = Port,
  102. ssl_options = SSLOptions,
  103. is_ssl = Is_ssl,
  104. lb_ets_tid = Lb_Tid},
  105. put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
  106. put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
  107. {ok, State};
  108. init(Url) when is_list(Url) ->
  109. case catch ibrowse_lib:parse_url(Url) of
  110. #url{protocol = Protocol} = Url_rec ->
  111. init({undefined, Url_rec, {[], Protocol == https}});
  112. {'EXIT', _} ->
  113. {error, invalid_url}
  114. end;
  115. init({Host, Port}) ->
  116. State = #state{host = Host,
  117. port = Port},
  118. put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
  119. put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
  120. {ok, State}.
  121. %%--------------------------------------------------------------------
  122. %% Function: handle_call/3
  123. %% Description: Handling call messages
  124. %% Returns: {reply, Reply, State} |
  125. %% {reply, Reply, State, Timeout} |
  126. %% {noreply, State} |
  127. %% {noreply, State, Timeout} |
  128. %% {stop, Reason, Reply, State} | (terminate/2 is called)
  129. %% {stop, Reason, State} (terminate/2 is called)
  130. %%--------------------------------------------------------------------
  131. %% Received a request when the remote server has already sent us a
  132. %% Connection: Close header
  133. handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
  134. {reply, {error, connection_closing}, State};
  135. handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
  136. From, State) ->
  137. send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
  138. handle_call(stop, _From, State) ->
  139. do_close(State),
  140. do_error_reply(State, closing_on_request),
  141. {stop, normal, ok, State};
  142. handle_call(Request, _From, State) ->
  143. Reply = {unknown_request, Request},
  144. {reply, Reply, State}.
  145. %%--------------------------------------------------------------------
  146. %% Function: handle_cast/2
  147. %% Description: Handling cast messages
  148. %% Returns: {noreply, State} |
  149. %% {noreply, State, Timeout} |
  150. %% {stop, Reason, State} (terminate/2 is called)
  151. %%--------------------------------------------------------------------
  152. handle_cast(_Msg, State) ->
  153. {noreply, State}.
  154. %%--------------------------------------------------------------------
  155. %% Function: handle_info/2
  156. %% Description: Handling all non call/cast messages
  157. %% Returns: {noreply, State} |
  158. %% {noreply, State, Timeout} |
  159. %% {stop, Reason, State} (terminate/2 is called)
  160. %%--------------------------------------------------------------------
  161. handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
  162. %% io:format("Recvd data: ~p~n", [Data]),
  163. do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
  164. handle_sock_data(Data, State);
  165. handle_info({ssl, _Sock, Data}, State) ->
  166. handle_sock_data(Data, State);
  167. handle_info({stream_next, Req_id}, #state{socket = Socket,
  168. cur_req = #request{req_id = Req_id}} = State) ->
  169. %% io:format("Client process set {active, once}~n", []),
  170. do_setopts(Socket, [{active, once}], State),
  171. {noreply, State};
  172. handle_info({stream_next, _Req_id}, State) ->
  173. _Cur_req_id = case State#state.cur_req of
  174. #request{req_id = Cur} ->
  175. Cur;
  176. _ ->
  177. undefined
  178. end,
  179. %% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n",
  180. %% [_Req_id, _Cur_req_id]),
  181. {noreply, State};
  182. handle_info({stream_close, _Req_id}, State) ->
  183. shutting_down(State),
  184. do_close(State),
  185. do_error_reply(State, closing_on_request),
  186. {stop, normal, State};
  187. handle_info({tcp_closed, _Sock}, State) ->
  188. do_trace("TCP connection closed by peer!~n", []),
  189. handle_sock_closed(State),
  190. {stop, normal, State};
  191. handle_info({ssl_closed, _Sock}, State) ->
  192. do_trace("SSL connection closed by peer!~n", []),
  193. handle_sock_closed(State),
  194. {stop, normal, State};
  195. handle_info({tcp_error, _Sock}, State) ->
  196. do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
  197. handle_sock_closed(State),
  198. {stop, normal, State};
  199. handle_info({ssl_error, _Sock}, State) ->
  200. do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
  201. handle_sock_closed(State),
  202. {stop, normal, State};
  203. handle_info({req_timedout, From}, State) ->
  204. case lists:keymember(From, #request.from, queue:to_list(State#state.reqs)) of
  205. false ->
  206. {noreply, State};
  207. true ->
  208. shutting_down(State),
  209. do_error_reply(State, req_timedout),
  210. {stop, normal, State}
  211. end;
  212. handle_info(timeout, State) ->
  213. receive
  214. {tcp, _Sock, Data} ->
  215. handle_sock_data(Data, State);
  216. {ssl, _Sock, Data} ->
  217. handle_sock_data(Data, State)
  218. after 0 ->
  219. do_trace("Inactivity timeout triggered. Shutting down connection~n", []),
  220. shutting_down(State),
  221. do_error_reply(State, req_timedout),
  222. {stop, normal, State}
  223. end;
  224. handle_info({trace, Bool}, State) ->
  225. put(my_trace_flag, Bool),
  226. {noreply, State};
  227. handle_info(Info, State) ->
  228. io:format("Unknown message recvd for ~1000.p:~1000.p -> ~p~n",
  229. [State#state.host, State#state.port, Info]),
  230. io:format("Recvd unknown message ~p when in state: ~p~n", [Info, State]),
  231. {noreply, State}.
  232. %%--------------------------------------------------------------------
  233. %% Function: terminate/2
  234. %% Description: Shutdown the server
  235. %% Returns: any (ignored by gen_server)
  236. %%--------------------------------------------------------------------
  237. terminate(_Reason, State) ->
  238. do_close(State),
  239. ok.
  240. %%--------------------------------------------------------------------
  241. %% Func: code_change/3
  242. %% Purpose: Convert process state when code is changed
  243. %% Returns: {ok, NewState}
  244. %%--------------------------------------------------------------------
  245. code_change(_OldVsn, State, _Extra) ->
  246. {ok, State}.
  247. %%--------------------------------------------------------------------
  248. %%% Internal functions
  249. %%--------------------------------------------------------------------
  250. %%--------------------------------------------------------------------
  251. %% Handles data recvd on the socket
  252. %%--------------------------------------------------------------------
  253. handle_sock_data(Data, #state{status=idle}=State) ->
  254. do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
  255. shutting_down(State),
  256. do_error_reply(State, data_in_status_idle),
  257. do_close(State),
  258. {stop, normal, State};
  259. handle_sock_data(Data, #state{status = get_header}=State) ->
  260. case parse_response(Data, State) of
  261. {error, _Reason} ->
  262. shutting_down(State),
  263. {stop, normal, State};
  264. State_1 ->
  265. active_once(State_1),
  266. State_2 = set_inac_timer(State_1),
  267. {noreply, State_2}
  268. end;
  269. handle_sock_data(Data, #state{status = get_body,
  270. socket = Socket,
  271. content_length = CL,
  272. http_status_code = StatCode,
  273. recvd_headers = Headers,
  274. chunk_size = CSz} = State) ->
  275. case (CL == undefined) and (CSz == undefined) of
  276. true ->
  277. case accumulate_response(Data, State) of
  278. {error, Reason} ->
  279. shutting_down(State),
  280. fail_pipelined_requests(State,
  281. {error, {Reason, {stat_code, StatCode}, Headers}}),
  282. {stop, normal, State};
  283. State_1 ->
  284. active_once(State_1),
  285. State_2 = set_inac_timer(State_1),
  286. {noreply, State_2}
  287. end;
  288. _ ->
  289. case parse_11_response(Data, State) of
  290. {error, Reason} ->
  291. shutting_down(State),
  292. fail_pipelined_requests(State,
  293. {error, {Reason, {stat_code, StatCode}, Headers}}),
  294. {stop, normal, State};
  295. #state{cur_req = #request{caller_controls_socket = Ccs},
  296. interim_reply_sent = Irs} = State_1 ->
  297. case Irs of
  298. true ->
  299. active_once(State_1);
  300. false when Ccs == true ->
  301. do_setopts(Socket, [{active, once}], State);
  302. false ->
  303. active_once(State_1)
  304. end,
  305. State_2 = State_1#state{interim_reply_sent = false},
  306. State_3 = set_inac_timer(State_2),
  307. {noreply, State_3};
  308. State_1 ->
  309. active_once(State_1),
  310. State_2 = set_inac_timer(State_1),
  311. {noreply, State_2}
  312. end
  313. end.
  314. accumulate_response(Data,
  315. #state{
  316. cur_req = #request{save_response_to_file = Srtf,
  317. tmp_file_fd = undefined} = CurReq,
  318. http_status_code=[$2 | _]}=State) when Srtf /= false ->
  319. TmpFilename = make_tmp_filename(Srtf),
  320. case file:open(TmpFilename, [write, delayed_write, raw]) of
  321. {ok, Fd} ->
  322. accumulate_response(Data, State#state{
  323. cur_req = CurReq#request{
  324. tmp_file_fd = Fd,
  325. tmp_file_name = TmpFilename}});
  326. {error, Reason} ->
  327. {error, {file_open_error, Reason}}
  328. end;
  329. accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf,
  330. tmp_file_fd = Fd},
  331. transfer_encoding=chunked,
  332. reply_buffer = Reply_buf,
  333. http_status_code=[$2 | _]
  334. } = State) when Srtf /= false ->
  335. case file:write(Fd, [Reply_buf, Data]) of
  336. ok ->
  337. State#state{reply_buffer = <<>>};
  338. {error, Reason} ->
  339. {error, {file_write_error, Reason}}
  340. end;
  341. accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf,
  342. tmp_file_fd = Fd},
  343. reply_buffer = RepBuf,
  344. http_status_code=[$2 | _]
  345. } = State) when Srtf /= false ->
  346. case file:write(Fd, [RepBuf, Data]) of
  347. ok ->
  348. State#state{reply_buffer = <<>>};
  349. {error, Reason} ->
  350. {error, {file_write_error, Reason}}
  351. end;
  352. accumulate_response(Data, #state{reply_buffer = RepBuf,
  353. rep_buf_size = RepBufSize,
  354. streamed_size = Streamed_size,
  355. cur_req = CurReq}=State) ->
  356. #request{stream_to = StreamTo,
  357. req_id = ReqId,
  358. stream_chunk_size = Stream_chunk_size,
  359. response_format = Response_format,
  360. caller_controls_socket = Caller_controls_socket} = CurReq,
  361. RepBuf_1 = <<RepBuf/binary, Data/binary>>,
  362. New_data_size = RepBufSize - Streamed_size,
  363. case StreamTo of
  364. undefined ->
  365. State#state{reply_buffer = RepBuf_1};
  366. _ when Caller_controls_socket == true ->
  367. do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
  368. State#state{reply_buffer = <<>>,
  369. interim_reply_sent = true,
  370. streamed_size = Streamed_size + size(RepBuf_1)};
  371. _ when New_data_size >= Stream_chunk_size ->
  372. {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
  373. do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
  374. State_1 = State#state{
  375. reply_buffer = <<>>,
  376. interim_reply_sent = true,
  377. streamed_size = Streamed_size + Stream_chunk_size},
  378. case Rem_data of
  379. <<>> ->
  380. State_1;
  381. _ ->
  382. accumulate_response(Rem_data, State_1)
  383. end;
  384. _ ->
  385. State#state{reply_buffer = RepBuf_1}
  386. end.
  387. make_tmp_filename(true) ->
  388. DownloadDir = ibrowse:get_config_value(download_dir, filename:absname("./")),
  389. {A,B,C} = now(),
  390. filename:join([DownloadDir,
  391. "ibrowse_tmp_file_"++
  392. integer_to_list(A) ++
  393. integer_to_list(B) ++
  394. integer_to_list(C)]);
  395. make_tmp_filename(File) when is_list(File) ->
  396. File.
  397. %%--------------------------------------------------------------------
  398. %% Handles the case when the server closes the socket
  399. %%--------------------------------------------------------------------
  400. handle_sock_closed(#state{status=get_header} = State) ->
  401. shutting_down(State),
  402. do_error_reply(State, connection_closed);
  403. handle_sock_closed(#state{cur_req=undefined} = State) ->
  404. shutting_down(State);
  405. %% We check for IsClosing because this the server could have sent a
  406. %% Connection-Close header and has closed the socket to indicate end
  407. %% of response. There maybe requests pipelined which need a response.
  408. handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC,
  409. is_closing = IsClosing,
  410. cur_req = #request{tmp_file_name=TmpFilename,
  411. tmp_file_fd=Fd} = CurReq,
  412. status = get_body,
  413. recvd_headers = Headers,
  414. status_line = Status_line,
  415. raw_headers = Raw_headers
  416. }=State) ->
  417. #request{from=From, stream_to=StreamTo, req_id=ReqId,
  418. response_format = Resp_format,
  419. options = Options} = CurReq,
  420. case IsClosing of
  421. true ->
  422. {_, Reqs_1} = queue:out(Reqs),
  423. Body = case TmpFilename of
  424. undefined ->
  425. Buf;
  426. _ ->
  427. file:close(Fd),
  428. {file, TmpFilename}
  429. end,
  430. Reply = case get_value(give_raw_headers, Options, false) of
  431. true ->
  432. {ok, Status_line, Raw_headers, Body};
  433. false ->
  434. {ok, SC, Headers, Buf}
  435. end,
  436. do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  437. do_error_reply(State#state{reqs = Reqs_1}, connection_closed),
  438. State;
  439. _ ->
  440. do_error_reply(State, connection_closed),
  441. State
  442. end.
  443. do_connect(Host, Port, Options, #state{is_ssl = true,
  444. use_proxy = false,
  445. ssl_options = SSLOptions},
  446. Timeout) ->
  447. Caller_socket_options = get_value(socket_options, Options, []),
  448. Other_sock_options = filter_sock_options(SSLOptions ++ Caller_socket_options),
  449. ssl:connect(Host, Port,
  450. [binary, {nodelay, true}, {active, false} | Other_sock_options],
  451. Timeout);
  452. do_connect(Host, Port, Options, _State, Timeout) ->
  453. Caller_socket_options = get_value(socket_options, Options, []),
  454. Other_sock_options = filter_sock_options(Caller_socket_options),
  455. gen_tcp:connect(Host, to_integer(Port),
  456. [binary, {nodelay, true}, {active, false} | Other_sock_options],
  457. Timeout).
  458. %% We don't want the caller to specify certain options
  459. filter_sock_options(Opts) ->
  460. lists:filter(fun({active, _}) ->
  461. false;
  462. ({packet, _}) ->
  463. false;
  464. (list) ->
  465. false;
  466. (_) ->
  467. true
  468. end, Opts).
  469. do_send(Req, #state{socket = Sock,
  470. is_ssl = true,
  471. use_proxy = true,
  472. proxy_tunnel_setup = Pts}) when Pts /= done -> gen_tcp:send(Sock, Req);
  473. do_send(Req, #state{socket = Sock, is_ssl = true}) -> ssl:send(Sock, Req);
  474. do_send(Req, #state{socket = Sock, is_ssl = false}) -> gen_tcp:send(Sock, Req).
  475. %% @spec do_send_body(Sock::socket_descriptor(), Source::source_descriptor(), IsSSL::boolean()) -> ok | error()
  476. %% source_descriptor() = fun_arity_0 |
  477. %% {fun_arity_0} |
  478. %% {fun_arity_1, term()}
  479. %% error() = term()
  480. do_send_body(Source, State, TE) when is_function(Source) ->
  481. do_send_body({Source}, State, TE);
  482. do_send_body({Source}, State, TE) when is_function(Source) ->
  483. do_send_body1(Source, Source(), State, TE);
  484. do_send_body({Source, Source_state}, State, TE) when is_function(Source) ->
  485. do_send_body1(Source, Source(Source_state), State, TE);
  486. do_send_body(Body, State, _TE) ->
  487. do_send(Body, State).
  488. do_send_body1(Source, Resp, State, TE) ->
  489. case Resp of
  490. {ok, Data} ->
  491. do_send(maybe_chunked_encode(Data, TE), State),
  492. do_send_body({Source}, State, TE);
  493. {ok, Data, New_source_state} ->
  494. do_send(maybe_chunked_encode(Data, TE), State),
  495. do_send_body({Source, New_source_state}, State, TE);
  496. eof when TE == true ->
  497. do_send(<<"0\r\n\r\n">>, State),
  498. ok;
  499. eof ->
  500. ok;
  501. Err ->
  502. Err
  503. end.
  504. maybe_chunked_encode(Data, false) ->
  505. Data;
  506. maybe_chunked_encode(Data, true) ->
  507. [?dec2hex(size(to_binary(Data))), "\r\n", Data, "\r\n"].
  508. do_close(#state{socket = undefined}) -> ok;
  509. do_close(#state{socket = Sock,
  510. is_ssl = true,
  511. use_proxy = true,
  512. proxy_tunnel_setup = Pts
  513. }) when Pts /= done -> catch gen_tcp:close(Sock);
  514. do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
  515. do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
  516. active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
  517. ok;
  518. active_once(#state{socket = Socket} = State) ->
  519. do_setopts(Socket, [{active, once}], State).
  520. do_setopts(_Sock, [], _) -> ok;
  521. do_setopts(Sock, Opts, #state{is_ssl = true,
  522. use_proxy = true,
  523. proxy_tunnel_setup = Pts}
  524. ) when Pts /= done -> inet:setopts(Sock, Opts);
  525. do_setopts(Sock, Opts, #state{is_ssl = true}) -> ssl:setopts(Sock, Opts);
  526. do_setopts(Sock, Opts, _) -> inet:setopts(Sock, Opts).
  527. check_ssl_options(Options, State) ->
  528. case get_value(is_ssl, Options, false) of
  529. false ->
  530. State;
  531. true ->
  532. State#state{is_ssl=true, ssl_options=get_value(ssl_options, Options)}
  533. end.
  534. send_req_1(From,
  535. #url{host = Host,
  536. port = Port} = Url,
  537. Headers, Method, Body, Options, Timeout,
  538. #state{socket = undefined} = State) ->
  539. {Host_1, Port_1, State_1} =
  540. case get_value(proxy_host, Options, false) of
  541. false ->
  542. {Host, Port, State};
  543. PHost ->
  544. ProxyUser = get_value(proxy_user, Options, []),
  545. ProxyPassword = get_value(proxy_password, Options, []),
  546. Digest = http_auth_digest(ProxyUser, ProxyPassword),
  547. {PHost, get_value(proxy_port, Options, 80),
  548. State#state{use_proxy = true,
  549. proxy_auth_digest = Digest}}
  550. end,
  551. State_2 = check_ssl_options(Options, State_1),
  552. do_trace("Connecting...~n", []),
  553. Conn_timeout = get_value(connect_timeout, Options, Timeout),
  554. case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
  555. {ok, Sock} ->
  556. do_trace("Connected! Socket: ~1000.p~n", [Sock]),
  557. State_3 = State_2#state{socket = Sock,
  558. connect_timeout = Conn_timeout},
  559. send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
  560. Err ->
  561. shutting_down(State_2),
  562. do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
  563. gen_server:reply(From, {error, {conn_failed, Err}}),
  564. {stop, normal, State_2}
  565. end;
  566. %% Send a CONNECT request.
  567. %% Wait for 200 OK
  568. %% Upgrade to SSL connection
  569. %% Then send request
  570. send_req_1(From,
  571. #url{
  572. host = Server_host,
  573. port = Server_port
  574. } = Url,
  575. Headers, Method, Body, Options, Timeout,
  576. #state{
  577. proxy_tunnel_setup = false,
  578. use_proxy = true,
  579. is_ssl = true} = State) ->
  580. NewReq = #request{
  581. method = connect,
  582. preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
  583. options = Options
  584. },
  585. State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
  586. Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
  587. Path = [Server_host, $:, integer_to_list(Server_port)],
  588. {Req, Body_1} = make_request(connect, Pxy_auth_headers,
  589. Path, Path,
  590. [], Options, State_1, undefined),
  591. TE = is_chunked_encoding_specified(Options),
  592. trace_request(Req),
  593. case do_send(Req, State) of
  594. ok ->
  595. case do_send_body(Body_1, State_1, TE) of
  596. ok ->
  597. trace_request_body(Body_1),
  598. active_once(State_1),
  599. Ref = case Timeout of
  600. infinity ->
  601. undefined;
  602. _ ->
  603. erlang:send_after(Timeout, self(), {req_timedout, From})
  604. end,
  605. State_2 = State_1#state{status = get_header,
  606. cur_req = NewReq,
  607. send_timer = Ref,
  608. proxy_tunnel_setup = in_progress,
  609. tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
  610. State_3 = set_inac_timer(State_2),
  611. {noreply, State_3};
  612. Err ->
  613. shutting_down(State_1),
  614. do_trace("Send failed... Reason: ~p~n", [Err]),
  615. gen_server:reply(From, {error, {send_failed, Err}}),
  616. {stop, normal, State_1}
  617. end;
  618. Err ->
  619. shutting_down(State_1),
  620. do_trace("Send failed... Reason: ~p~n", [Err]),
  621. gen_server:reply(From, {error, {send_failed, Err}}),
  622. {stop, normal, State_1}
  623. end;
  624. send_req_1(From, Url, Headers, Method, Body, Options, Timeout,
  625. #state{proxy_tunnel_setup = in_progress,
  626. tunnel_setup_queue = Q} = State) ->
  627. do_trace("Queued SSL request awaiting tunnel setup: ~n"
  628. "URL : ~s~n"
  629. "Method : ~p~n"
  630. "Headers : ~p~n", [Url, Method, Headers]),
  631. {noreply, State#state{tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout} | Q]}};
  632. send_req_1(From,
  633. #url{abspath = AbsPath,
  634. path = RelPath} = Url,
  635. Headers, Method, Body, Options, Timeout,
  636. #state{status = Status,
  637. socket = Socket} = State) ->
  638. ReqId = make_req_id(),
  639. Resp_format = get_value(response_format, Options, list),
  640. Caller_socket_options = get_value(socket_options, Options, []),
  641. {StreamTo, Caller_controls_socket} =
  642. case get_value(stream_to, Options, undefined) of
  643. {Caller, once} when is_pid(Caller) or
  644. is_atom(Caller) ->
  645. Async_pid_rec = {{req_id_pid, ReqId}, self()},
  646. true = ets:insert(ibrowse_stream, Async_pid_rec),
  647. {Caller, true};
  648. undefined ->
  649. {undefined, false};
  650. Caller when is_pid(Caller) or
  651. is_atom(Caller) ->
  652. {Caller, false};
  653. Stream_to_inv ->
  654. exit({invalid_option, {stream_to, Stream_to_inv}})
  655. end,
  656. SaveResponseToFile = get_value(save_response_to_file, Options, false),
  657. NewReq = #request{url = Url,
  658. method = Method,
  659. stream_to = StreamTo,
  660. caller_controls_socket = Caller_controls_socket,
  661. caller_socket_options = Caller_socket_options,
  662. options = Options,
  663. req_id = ReqId,
  664. save_response_to_file = SaveResponseToFile,
  665. stream_chunk_size = get_stream_chunk_size(Options),
  666. response_format = Resp_format,
  667. from = From,
  668. preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
  669. },
  670. State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
  671. Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
  672. {Req, Body_1} = make_request(Method,
  673. Headers_1,
  674. AbsPath, RelPath, Body, Options, State_1,
  675. ReqId),
  676. trace_request(Req),
  677. do_setopts(Socket, Caller_socket_options, State_1),
  678. TE = is_chunked_encoding_specified(Options),
  679. case do_send(Req, State_1) of
  680. ok ->
  681. case do_send_body(Body_1, State_1, TE) of
  682. ok ->
  683. trace_request_body(Body_1),
  684. State_2 = inc_pipeline_counter(State_1),
  685. active_once(State_2),
  686. Ref = case Timeout of
  687. infinity ->
  688. undefined;
  689. _ ->
  690. erlang:send_after(Timeout, self(), {req_timedout, From})
  691. end,
  692. State_3 = case Status of
  693. idle ->
  694. State_2#state{status = get_header,
  695. cur_req = NewReq,
  696. send_timer = Ref};
  697. _ ->
  698. State_2#state{send_timer = Ref}
  699. end,
  700. case StreamTo of
  701. undefined ->
  702. ok;
  703. _ ->
  704. gen_server:reply(From, {ibrowse_req_id, ReqId})
  705. end,
  706. State_4 = set_inac_timer(State_3),
  707. {noreply, State_4};
  708. Err ->
  709. shutting_down(State_1),
  710. do_trace("Send failed... Reason: ~p~n", [Err]),
  711. gen_server:reply(From, {error, {send_failed, Err}}),
  712. {stop, normal, State_1}
  713. end;
  714. Err ->
  715. shutting_down(State_1),
  716. do_trace("Send failed... Reason: ~p~n", [Err]),
  717. gen_server:reply(From, {error, {send_failed, Err}}),
  718. {stop, normal, State_1}
  719. end.
  720. maybe_modify_headers(#url{}, connect, _, Headers, State) ->
  721. add_proxy_auth_headers(State, Headers);
  722. maybe_modify_headers(#url{host = Host, port = Port} = Url,
  723. _Method,
  724. Options, Headers, State) ->
  725. case get_value(headers_as_is, Options, false) of
  726. false ->
  727. Headers_1 = add_auth_headers(Url, Options, Headers, State),
  728. HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
  729. false ->
  730. case Port of
  731. 80 -> Host;
  732. 443 -> Host;
  733. _ -> [Host, ":", integer_to_list(Port)]
  734. end;
  735. {value, {_, Host_h_val}} ->
  736. Host_h_val
  737. end,
  738. [{"Host", HostHeaderValue} | Headers_1];
  739. true ->
  740. Headers
  741. end.
  742. add_auth_headers(#url{username = User,
  743. password = UPw},
  744. Options,
  745. Headers,
  746. State) ->
  747. Headers_1 = case User of
  748. undefined ->
  749. case get_value(basic_auth, Options, undefined) of
  750. undefined ->
  751. Headers;
  752. {U,P} ->
  753. [{"Authorization", ["Basic ", http_auth_digest(U, P)]} | Headers]
  754. end;
  755. _ ->
  756. [{"Authorization", ["Basic ", http_auth_digest(User, UPw)]} | Headers]
  757. end,
  758. add_proxy_auth_headers(State, Headers_1).
  759. add_proxy_auth_headers(#state{use_proxy = false}, Headers) ->
  760. Headers;
  761. add_proxy_auth_headers(#state{proxy_auth_digest = []}, Headers) ->
  762. Headers;
  763. add_proxy_auth_headers(#state{proxy_auth_digest = Auth_digest}, Headers) ->
  764. [{"Proxy-Authorization", ["Basic ", Auth_digest]} | Headers].
  765. http_auth_digest([], []) ->
  766. [];
  767. http_auth_digest(Username, Password) ->
  768. ibrowse_lib:encode_base64(Username ++ [$: | Password]).
  769. make_request(Method, Headers, AbsPath, RelPath, Body, Options,
  770. #state{use_proxy = UseProxy, is_ssl = Is_ssl}, ReqId) ->
  771. HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
  772. Fun1 = fun({X, Y}) when is_atom(X) ->
  773. {to_lower(atom_to_list(X)), X, Y};
  774. ({X, Y}) when is_list(X) ->
  775. {to_lower(X), X, Y}
  776. end,
  777. Headers_0 = [Fun1(X) || X <- Headers],
  778. Headers_1 =
  779. case lists:keysearch("content-length", 1, Headers_0) of
  780. false when (Body =:= [] orelse Body =:= <<>>) andalso
  781. (Method =:= post orelse Method =:= put) ->
  782. [{"content-length", "Content-Length", "0"} | Headers_0];
  783. false when is_binary(Body) orelse is_list(Body) ->
  784. [{"content-length", "Content-Length", integer_to_list(iolist_size(Body))} | Headers_0];
  785. _ ->
  786. %% Content-Length is already specified or Body is a
  787. %% function or function/state pair
  788. Headers_0
  789. end,
  790. {Headers_2, Body_1} =
  791. case is_chunked_encoding_specified(Options) of
  792. false ->
  793. {[{Y, Z} || {_, Y, Z} <- Headers_1], Body};
  794. true ->
  795. Chunk_size_1 = case get_value(transfer_encoding, Options) of
  796. chunked ->
  797. 5120;
  798. {chunked, Chunk_size} ->
  799. Chunk_size
  800. end,
  801. {[{Y, Z} || {X, Y, Z} <- Headers_1,
  802. X /= "content-length"] ++
  803. [{"Transfer-Encoding", "chunked"}],
  804. chunk_request_body(Body, Chunk_size_1)}
  805. end,
  806. Headers_3 = case lists:member({include_ibrowse_req_id, true}, Options) of
  807. true ->
  808. [{"x-ibrowse-request-id", io_lib:format("~1000.p",[ReqId])} | Headers_2];
  809. false ->
  810. Headers_2
  811. end,
  812. Headers_4 = cons_headers(Headers_3),
  813. Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
  814. true ->
  815. case Is_ssl of
  816. true ->
  817. RelPath;
  818. false ->
  819. AbsPath
  820. end;
  821. false ->
  822. RelPath
  823. end,
  824. {[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_4, crnl()], Body_1}.
  825. is_chunked_encoding_specified(Options) ->
  826. case get_value(transfer_encoding, Options, false) of
  827. false ->
  828. false;
  829. {chunked, _} ->
  830. true;
  831. chunked ->
  832. true
  833. end.
  834. http_vsn_string({0,9}) -> "HTTP/0.9";
  835. http_vsn_string({1,0}) -> "HTTP/1.0";
  836. http_vsn_string({1,1}) -> "HTTP/1.1".
  837. cons_headers(Headers) ->
  838. cons_headers(Headers, []).
  839. cons_headers([], Acc) ->
  840. encode_headers(Acc);
  841. cons_headers([{basic_auth, {U,P}} | T], Acc) ->
  842. cons_headers(T, [{"Authorization",
  843. ["Basic ", ibrowse_lib:encode_base64(U++":"++P)]} | Acc]);
  844. cons_headers([{cookie, Cookie} | T], Acc) ->
  845. cons_headers(T, [{"Cookie", Cookie} | Acc]);
  846. cons_headers([{content_length, L} | T], Acc) ->
  847. cons_headers(T, [{"Content-Length", L} | Acc]);
  848. cons_headers([{content_type, L} | T], Acc) ->
  849. cons_headers(T, [{"Content-Type", L} | Acc]);
  850. cons_headers([H | T], Acc) ->
  851. cons_headers(T, [H | Acc]).
  852. encode_headers(L) ->
  853. encode_headers(L, []).
  854. encode_headers([{http_vsn, _Val} | T], Acc) ->
  855. encode_headers(T, Acc);
  856. encode_headers([{Name,Val} | T], Acc) when is_list(Name) ->
  857. encode_headers(T, [[Name, ": ", fmt_val(Val), crnl()] | Acc]);
  858. encode_headers([{Name,Val} | T], Acc) when is_atom(Name) ->
  859. encode_headers(T, [[atom_to_list(Name), ": ", fmt_val(Val), crnl()] | Acc]);
  860. encode_headers([], Acc) ->
  861. lists:reverse(Acc).
  862. chunk_request_body(Body, _ChunkSize) when is_tuple(Body) orelse
  863. is_function(Body) ->
  864. Body;
  865. chunk_request_body(Body, ChunkSize) ->
  866. chunk_request_body(Body, ChunkSize, []).
  867. chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
  868. LastChunk = "0\r\n",
  869. lists:reverse(["\r\n", LastChunk | Acc]);
  870. chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
  871. size(Body) >= ChunkSize ->
  872. <<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
  873. Chunk = [?dec2hex(ChunkSize),"\r\n",
  874. ChunkBody, "\r\n"],
  875. chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
  876. chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
  877. BodySize = size(Body),
  878. Chunk = [?dec2hex(BodySize),"\r\n",
  879. Body, "\r\n"],
  880. LastChunk = "0\r\n",
  881. lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
  882. chunk_request_body(Body, ChunkSize, Acc) when length(Body) >= ChunkSize ->
  883. {ChunkBody, Rest} = split_list_at(Body, ChunkSize),
  884. Chunk = [?dec2hex(ChunkSize),"\r\n",
  885. ChunkBody, "\r\n"],
  886. chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
  887. chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
  888. BodySize = length(Body),
  889. Chunk = [?dec2hex(BodySize),"\r\n",
  890. Body, "\r\n"],
  891. LastChunk = "0\r\n",
  892. lists:reverse(["\r\n", LastChunk, Chunk | Acc]).
  893. parse_response(_Data, #state{cur_req = undefined}=State) ->
  894. State#state{status = idle};
  895. parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
  896. cur_req = CurReq} = State) ->
  897. #request{from=From, stream_to=StreamTo, req_id=ReqId,
  898. method=Method, response_format = Resp_format,
  899. options = Options
  900. } = CurReq,
  901. MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
  902. case scan_header(Acc, Data) of
  903. {yes, Headers, Data_1} ->
  904. do_trace("Recvd Header Data -> ~s~n----~n", [Headers]),
  905. do_trace("Recvd headers~n--- Headers Begin ---~n~s~n--- Headers End ---~n~n", [Headers]),
  906. {HttpVsn, StatCode, Headers_1, Status_line, Raw_headers} = parse_headers(Headers),
  907. do_trace("HttpVsn: ~p StatusCode: ~p Headers_1 -> ~1000.p~n", [HttpVsn, StatCode, Headers_1]),
  908. LCHeaders = [{to_lower(X), Y} || {X,Y} <- Headers_1],
  909. ConnClose = to_lower(get_value("connection", LCHeaders, "false")),
  910. IsClosing = is_connection_closing(HttpVsn, ConnClose),
  911. case IsClosing of
  912. true ->
  913. shutting_down(State);
  914. false ->
  915. ok
  916. end,
  917. Give_raw_headers = get_value(give_raw_headers, Options, false),
  918. State_1 = case Give_raw_headers of
  919. true ->
  920. State#state{recvd_headers=Headers_1, status=get_body,
  921. reply_buffer = <<>>,
  922. status_line = Status_line,
  923. raw_headers = Raw_headers,
  924. http_status_code=StatCode, is_closing=IsClosing};
  925. false ->
  926. State#state{recvd_headers=Headers_1, status=get_body,
  927. reply_buffer = <<>>,
  928. http_status_code=StatCode, is_closing=IsClosing}
  929. end,
  930. put(conn_close, ConnClose),
  931. TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")),
  932. case get_value("content-length", LCHeaders, undefined) of
  933. _ when Method == connect,
  934. hd(StatCode) == $2 ->
  935. cancel_timer(State#state.send_timer),
  936. {_, Reqs_1} = queue:out(Reqs),
  937. upgrade_to_ssl(set_cur_request(State#state{reqs = Reqs_1,
  938. recvd_headers = [],
  939. status = idle
  940. }));
  941. _ when Method == connect ->
  942. {_, Reqs_1} = queue:out(Reqs),
  943. do_error_reply(State#state{reqs = Reqs_1},
  944. {error, proxy_tunnel_failed}),
  945. {error, proxy_tunnel_failed};
  946. _ when Method == head ->
  947. {_, Reqs_1} = queue:out(Reqs),
  948. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  949. State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
  950. {ok, StatCode, Headers_1, []}),
  951. cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
  952. State_2 = reset_state(State_1_1),
  953. State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
  954. parse_response(Data_1, State_3);
  955. _ when hd(StatCode) =:= $1 ->
  956. %% No message body is expected. Server may send
  957. %% one or more 1XX responses before a proper
  958. %% response.
  959. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  960. do_trace("Recvd a status code of ~p. Ignoring and waiting for a proper response~n", [StatCode]),
  961. parse_response(Data_1, State_1#state{recvd_headers = [],
  962. status = get_header});
  963. _ when StatCode =:= "204";
  964. StatCode =:= "304" ->
  965. %% No message body is expected for these Status Codes.
  966. %% RFC2616 - Sec 4.4
  967. {_, Reqs_1} = queue:out(Reqs),
  968. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  969. State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
  970. {ok, StatCode, Headers_1, []}),
  971. cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
  972. State_2 = reset_state(State_1_1),
  973. State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
  974. parse_response(Data_1, State_3);
  975. _ when TransferEncoding =:= "chunked" ->
  976. do_trace("Chunked encoding detected...~n",[]),
  977. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  978. case parse_11_response(Data_1, State_1#state{transfer_encoding=chunked,
  979. chunk_size=chunk_start,
  980. reply_buffer = <<>>}) of
  981. {error, Reason} ->
  982. fail_pipelined_requests(State_1,
  983. {error, {Reason,
  984. {stat_code, StatCode}, Headers_1}}),
  985. {error, Reason};
  986. State_2 ->
  987. State_2
  988. end;
  989. undefined when HttpVsn =:= "HTTP/1.0";
  990. ConnClose =:= "close" ->
  991. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  992. State_1#state{reply_buffer = Data_1};
  993. undefined ->
  994. fail_pipelined_requests(State_1,
  995. {error, {content_length_undefined,
  996. {stat_code, StatCode}, Headers}}),
  997. {error, content_length_undefined};
  998. V ->
  999. case catch list_to_integer(V) of
  1000. V_1 when is_integer(V_1), V_1 >= 0 ->
  1001. send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
  1002. do_trace("Recvd Content-Length of ~p~n", [V_1]),
  1003. State_2 = State_1#state{rep_buf_size=0,
  1004. reply_buffer = <<>>,
  1005. content_length=V_1},
  1006. case parse_11_response(Data_1, State_2) of
  1007. {error, Reason} ->
  1008. fail_pipelined_requests(State_1,
  1009. {error, {Reason,
  1010. {stat_code, StatCode}, Headers_1}}),
  1011. {error, Reason};
  1012. State_3 ->
  1013. State_3
  1014. end;
  1015. _ ->
  1016. fail_pipelined_requests(State_1,
  1017. {error, {content_length_undefined,
  1018. {stat_code, StatCode}, Headers}}),
  1019. {error, content_length_undefined}
  1020. end
  1021. end;
  1022. {no, Acc_1} when MaxHeaderSize == infinity ->
  1023. State#state{reply_buffer = Acc_1};
  1024. {no, Acc_1} when size(Acc_1) < MaxHeaderSize ->
  1025. State#state{reply_buffer = Acc_1};
  1026. {no, _Acc_1} ->
  1027. fail_pipelined_requests(State, {error, max_headers_size_exceeded}),
  1028. {error, max_headers_size_exceeded}
  1029. end.
  1030. upgrade_to_ssl(#state{socket = Socket,
  1031. connect_timeout = Conn_timeout,
  1032. ssl_options = Ssl_options,
  1033. tunnel_setup_queue = Q} = State) ->
  1034. case ssl:connect(Socket, Ssl_options, Conn_timeout) of
  1035. {ok, Ssl_socket} ->
  1036. do_trace("Upgraded to SSL socket!!~n", []),
  1037. State_1 = State#state{socket = Ssl_socket,
  1038. proxy_tunnel_setup = done},
  1039. send_queued_requests(lists:reverse(Q), State_1);
  1040. Err ->
  1041. do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
  1042. do_error_reply(State, {error, {send_failed, Err}}),
  1043. {error, send_failed}
  1044. end.
  1045. send_queued_requests([], State) ->
  1046. do_trace("Sent all queued requests via SSL connection~n", []),
  1047. State#state{tunnel_setup_queue = []};
  1048. send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q],
  1049. State) ->
  1050. case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
  1051. {noreply, State_1} ->
  1052. send_queued_requests(Q, State_1);
  1053. Err ->
  1054. do_trace("Error sending queued SSL request: ~n"
  1055. "URL : ~s~n"
  1056. "Method : ~p~n"
  1057. "Headers : ~p~n", [Url, Method, Headers]),
  1058. do_error_reply(State, {error, {send_failed, Err}}),
  1059. {error, send_failed}
  1060. end.
  1061. is_connection_closing("HTTP/0.9", _) -> true;
  1062. is_connection_closing(_, "close") -> true;
  1063. is_connection_closing("HTTP/1.0", "false") -> true;
  1064. is_connection_closing(_, _) -> false.
  1065. %% This clause determines the chunk size when given data from the beginning of the chunk
  1066. parse_11_response(DataRecvd,
  1067. #state{transfer_encoding = chunked,
  1068. chunk_size = chunk_start,
  1069. chunk_size_buffer = Chunk_sz_buf
  1070. } = State) ->
  1071. case scan_crlf(Chunk_sz_buf, DataRecvd) of
  1072. {yes, ChunkHeader, Data_1} ->
  1073. State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
  1074. ChunkSize = parse_chunk_header(ChunkHeader),
  1075. %%
  1076. %% Do we have to preserve the chunk encoding when
  1077. %% streaming? NO. This should be transparent to the client
  1078. %% process. Chunked encoding was only introduced to make
  1079. %% it efficient for the server.
  1080. %%
  1081. RemLen = size(Data_1),
  1082. do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
  1083. [ChunkSize, RemLen]),
  1084. parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
  1085. deleted_crlf = true,
  1086. recvd_chunk_size = 0,
  1087. chunk_size = ChunkSize});
  1088. {no, Data_1} ->
  1089. State#state{chunk_size_buffer = Data_1}
  1090. end;
  1091. %% This clause is to remove the CRLF between two chunks
  1092. %%
  1093. parse_11_response(DataRecvd,
  1094. #state{transfer_encoding = chunked,
  1095. chunk_size = tbd,
  1096. chunk_size_buffer = Buf
  1097. } = State) ->
  1098. case scan_crlf(Buf, DataRecvd) of
  1099. {yes, _, NextChunk} ->
  1100. State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
  1101. State_2 = State_1#state{chunk_size = chunk_start,
  1102. chunk_size_buffer = <<>>,
  1103. deleted_crlf = true},
  1104. parse_11_response(NextChunk, State_2);
  1105. {no, Data_1} ->
  1106. State#state{chunk_size_buffer = Data_1}
  1107. end;
  1108. %% This clause deals with the end of a chunked transfer. ibrowse does
  1109. %% not support Trailers in the Chunked Transfer encoding. Any trailer
  1110. %% received is silently discarded.
  1111. parse_11_response(DataRecvd,
  1112. #state{transfer_encoding = chunked, chunk_size = 0,
  1113. cur_req = CurReq,
  1114. deleted_crlf = DelCrlf,
  1115. chunk_size_buffer = Trailer,
  1116. reqs = Reqs} = State) ->
  1117. do_trace("Detected end of chunked transfer...~n", []),
  1118. DataRecvd_1 = case DelCrlf of
  1119. false ->
  1120. DataRecvd;
  1121. true ->
  1122. <<$\r, $\n, DataRecvd/binary>>
  1123. end,
  1124. case scan_header(Trailer, DataRecvd_1) of
  1125. {yes, TEHeaders, Rem} ->
  1126. {_, Reqs_1} = queue:out(Reqs),
  1127. State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
  1128. State_2 = handle_response(CurReq,
  1129. State_1#state{reqs = Reqs_1}),
  1130. parse_response(Rem, reset_state(State_2));
  1131. {no, Rem} ->
  1132. accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
  1133. end;
  1134. %% This clause extracts a chunk, given the size.
  1135. parse_11_response(DataRecvd,
  1136. #state{transfer_encoding = chunked,
  1137. chunk_size = CSz,
  1138. recvd_chunk_size = Recvd_csz,
  1139. rep_buf_size = RepBufSz} = State) ->
  1140. NeedBytes = CSz - Recvd_csz,
  1141. DataLen = size(DataRecvd),
  1142. do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]),
  1143. case DataLen >= NeedBytes of
  1144. true ->
  1145. {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
  1146. do_trace("Recvd another chunk...~p~n", [RemChunk]),
  1147. do_trace("RemData -> ~p~n", [RemData]),
  1148. case accumulate_response(RemChunk, State) of
  1149. {error, Reason} ->
  1150. do_trace("Error accumulating response --> ~p~n", [Reason]),
  1151. {error, Reason};
  1152. #state{} = State_1 ->
  1153. State_2 = State_1#state{chunk_size=tbd},
  1154. parse_11_response(RemData, State_2)
  1155. end;
  1156. false ->
  1157. accumulate_response(DataRecvd,
  1158. State#state{rep_buf_size = RepBufSz + DataLen,
  1159. recvd_chunk_size = Recvd_csz + DataLen})
  1160. end;
  1161. %% This clause to extract the body when Content-Length is specified
  1162. parse_11_response(DataRecvd,
  1163. #state{content_length=CL, rep_buf_size=RepBufSz,
  1164. reqs=Reqs}=State) ->
  1165. NeedBytes = CL - RepBufSz,
  1166. DataLen = size(DataRecvd),
  1167. case DataLen >= NeedBytes of
  1168. true ->
  1169. {RemBody, Rem} = split_binary(DataRecvd, NeedBytes),
  1170. {_, Reqs_1} = queue:out(Reqs),
  1171. State_1 = accumulate_response(RemBody, State),
  1172. State_2 = handle_response(State_1#state.cur_req, State_1#state{reqs=Reqs_1}),
  1173. State_3 = reset_state(State_2),
  1174. parse_response(Rem, State_3);
  1175. false ->
  1176. accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
  1177. end.
  1178. maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
  1179. State;
  1180. maybe_accumulate_ce_data(State, Data) ->
  1181. accumulate_response(Data, State).
  1182. handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
  1183. response_format = Resp_format,
  1184. save_response_to_file = SaveResponseToFile,
  1185. tmp_file_name = TmpFilename,
  1186. tmp_file_fd = Fd,
  1187. options = Options
  1188. },
  1189. #state{http_status_code = SCode,
  1190. status_line = Status_line,
  1191. raw_headers = Raw_headers,
  1192. send_timer = ReqTimer,
  1193. reply_buffer = RepBuf,
  1194. recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
  1195. Body = RepBuf,
  1196. file:close(Fd),
  1197. ResponseBody = case TmpFilename of
  1198. undefined ->
  1199. Body;
  1200. _ ->
  1201. {file, TmpFilename}
  1202. end,
  1203. {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
  1204. Reply = case get_value(give_raw_headers, Options, false) of
  1205. true ->
  1206. {ok, Status_line, Raw_headers_1, ResponseBody};
  1207. false ->
  1208. {ok, SCode, Resp_headers_1, ResponseBody}
  1209. end,
  1210. State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  1211. cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
  1212. set_cur_request(State_1);
  1213. handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
  1214. response_format = Resp_format,
  1215. options = Options},
  1216. #state{http_status_code = SCode,
  1217. status_line = Status_line,
  1218. raw_headers = Raw_headers,
  1219. recvd_headers = Resp_headers,
  1220. reply_buffer = RepBuf,
  1221. send_timer = ReqTimer} = State) ->
  1222. Body = RepBuf,
  1223. {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
  1224. Reply = case get_value(give_raw_headers, Options, false) of
  1225. true ->
  1226. {ok, Status_line, Raw_headers_1, Body};
  1227. false ->
  1228. {ok, SCode, Resp_headers_1, Body}
  1229. end,
  1230. State_1 = do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  1231. cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
  1232. set_cur_request(State_1).
  1233. reset_state(State) ->
  1234. State#state{status = get_header,
  1235. rep_buf_size = 0,
  1236. streamed_size = 0,
  1237. content_length = undefined,
  1238. reply_buffer = <<>>,
  1239. chunk_size_buffer = <<>>,
  1240. recvd_headers = [],
  1241. status_line = undefined,
  1242. raw_headers = undefined,
  1243. deleted_crlf = false,
  1244. http_status_code = undefined,
  1245. chunk_size = undefined,
  1246. transfer_encoding = undefined
  1247. }.
  1248. set_cur_request(#state{reqs = Reqs, socket = Socket} = State) ->
  1249. case queue:to_list(Reqs) of
  1250. [] ->
  1251. State#state{cur_req = undefined};
  1252. [#request{caller_controls_socket = Ccs} = NextReq | _] ->
  1253. case Ccs of
  1254. true ->
  1255. do_setopts(Socket, [{active, once}], State);
  1256. _ ->
  1257. ok
  1258. end,
  1259. State#state{cur_req = NextReq}
  1260. end.
  1261. parse_headers(Headers) ->
  1262. case scan_crlf(Headers) of
  1263. {yes, StatusLine, T} ->
  1264. parse_headers(StatusLine, T);
  1265. {no, StatusLine} ->
  1266. parse_headers(StatusLine, <<>>)
  1267. end.
  1268. parse_headers(StatusLine, Headers) ->
  1269. Headers_1 = parse_headers_1(Headers),
  1270. case parse_status_line(StatusLine) of
  1271. {ok, HttpVsn, StatCode, _Msg} ->
  1272. put(http_prot_vsn, HttpVsn),
  1273. {HttpVsn, StatCode, Headers_1, StatusLine, Headers};
  1274. _ -> %% A HTTP 0.9 response?
  1275. put(http_prot_vsn, "HTTP/0.9"),
  1276. {"HTTP/0.9", undefined, Headers, StatusLine, Headers}
  1277. end.
  1278. % From RFC 2616
  1279. %
  1280. % HTTP/1.1 header field values can be folded onto multiple lines if
  1281. % the continuation line begins with a space or horizontal tab. All
  1282. % linear white space, including folding, has the same semantics as
  1283. % SP. A recipient MAY replace any linear white space with a single
  1284. % SP before interpreting the field value or forwarding the message
  1285. % downstream.
  1286. parse_headers_1(B) when is_binary(B) ->
  1287. parse_headers_1(binary_to_list(B));
  1288. parse_headers_1(String) ->
  1289. parse_headers_1(String, [], []).
  1290. parse_headers_1([$\n, H |T], [$\r | L], Acc) when H =:= 32;
  1291. H =:= $\t ->
  1292. parse_headers_1(lists:dropwhile(fun(X) ->
  1293. is_whitespace(X)
  1294. end, T), [32 | L], Acc);
  1295. parse_headers_1([$\n|T], [$\r | L], Acc) ->
  1296. case parse_header(lists:reverse(L)) of
  1297. invalid ->
  1298. parse_headers_1(T, [], Acc);
  1299. NewHeader ->
  1300. parse_headers_1(T, [], [NewHeader | Acc])
  1301. end;
  1302. parse_headers_1([H|T], L, Acc) ->
  1303. parse_headers_1(T, [H|L], Acc);
  1304. parse_headers_1([], [], Acc) ->
  1305. lists:reverse(Acc);
  1306. parse_headers_1([], L, Acc) ->
  1307. Acc_1 = case parse_header(lists:reverse(L)) of
  1308. invalid ->
  1309. Acc;
  1310. NewHeader ->
  1311. [NewHeader | Acc]
  1312. end,
  1313. lists:reverse(Acc_1).
  1314. parse_status_line(Line) when is_binary(Line) ->
  1315. parse_status_line(binary_to_list(Line));
  1316. parse_status_line(Line) ->
  1317. parse_status_line(Line, get_prot_vsn, [], []).
  1318. parse_status_line([32 | T], get_prot_vsn, ProtVsn, StatCode) ->
  1319. parse_status_line(T, get_status_code, ProtVsn, StatCode);
  1320. parse_status_line([32 | T], get_status_code, ProtVsn, StatCode) ->
  1321. {ok, lists:reverse(ProtVsn), lists:reverse(StatCode), T};
  1322. parse_status_line([], get_status_code, ProtVsn, StatCode) ->
  1323. {ok, lists:reverse(ProtVsn), lists:reverse(StatCode), []};
  1324. parse_status_line([H | T], get_prot_vsn, ProtVsn, StatCode) ->
  1325. parse_status_line(T, get_prot_vsn, [H|ProtVsn], StatCode);
  1326. parse_status_line([H | T], get_status_code, ProtVsn, StatCode) ->
  1327. parse_status_line(T, get_status_code, ProtVsn, [H | StatCode]);
  1328. parse_status_line([], _, _, _) ->
  1329. http_09.
  1330. parse_header(L) ->
  1331. parse_header(L, []).
  1332. parse_header([$: | V], Acc) ->
  1333. {lists:reverse(Acc), string:strip(V)};
  1334. parse_header([H | T], Acc) ->
  1335. parse_header(T, [H | Acc]);
  1336. parse_header([], _) ->
  1337. invalid.
  1338. scan_header(Bin) ->
  1339. case get_crlf_crlf_pos(Bin, 0) of
  1340. {yes, Pos} ->
  1341. {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos),
  1342. {yes, Headers, Body};
  1343. no ->
  1344. {no, Bin}
  1345. end.
  1346. scan_header(Bin1, Bin2) when size(Bin1) < 4 ->
  1347. scan_header(<<Bin1/binary, Bin2/binary>>);
  1348. scan_header(Bin1, <<>>) ->
  1349. scan_header(Bin1);
  1350. scan_header(Bin1, Bin2) ->
  1351. Bin1_already_scanned_size = size(Bin1) - 4,
  1352. <<Headers_prefix:Bin1_already_scanned_size/binary, Rest/binary>> = Bin1,
  1353. Bin_to_scan = <<Rest/binary, Bin2/binary>>,
  1354. case get_crlf_crlf_pos(Bin_to_scan, 0) of
  1355. {yes, Pos} ->
  1356. {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
  1357. {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
  1358. no ->
  1359. {no, <<Bin1/binary, Bin2/binary>>}
  1360. end.
  1361. get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos};
  1362. get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1);
  1363. get_crlf_crlf_pos(<<>>, _) -> no.
  1364. scan_crlf(Bin) ->
  1365. case get_crlf_pos(Bin) of
  1366. {yes, Pos} ->
  1367. {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos),
  1368. {yes, Prefix, Suffix};
  1369. no ->
  1370. {no, Bin}
  1371. end.
  1372. scan_crlf(<<>>, Bin2) ->
  1373. scan_crlf(Bin2);
  1374. scan_crlf(Bin1, Bin2) when size(Bin1) < 2 ->
  1375. scan_crlf(<<Bin1/binary, Bin2/binary>>);
  1376. scan_crlf(Bin1, Bin2) ->
  1377. scan_crlf_1(size(Bin1) - 2, Bin1, Bin2).
  1378. scan_crlf_1(Bin1_head_size, Bin1, Bin2) ->
  1379. <<Bin1_head:Bin1_head_size/binary, Bin1_tail/binary>> = Bin1,
  1380. Bin3 = <<Bin1_tail/binary, Bin2/binary>>,
  1381. case get_crlf_pos(Bin3) of
  1382. {yes, Pos} ->
  1383. {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos),
  1384. {yes, list_to_binary([Bin1_head, Prefix]), Suffix};
  1385. no ->
  1386. {no, list_to_binary([Bin1, Bin2])}
  1387. end.
  1388. get_crlf_pos(Bin) ->
  1389. get_crlf_pos(Bin, 0).
  1390. get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, Pos};
  1391. get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1);
  1392. get_crlf_pos(<<>>, _) -> no.
  1393. fmt_val(L) when is_list(L) -> L;
  1394. fmt_val(I) when is_integer(I) -> integer_to_list(I);
  1395. fmt_val(A) when is_atom(A) -> atom_to_list(A);
  1396. fmt_val(Term) -> io_lib:format("~p", [Term]).
  1397. crnl() -> "\r\n".
  1398. method(get) -> "GET";
  1399. method(post) -> "POST";
  1400. method(head) -> "HEAD";
  1401. method(options) -> "OPTIONS";
  1402. method(put) -> "PUT";
  1403. method(delete) -> "DELETE";
  1404. method(trace) -> "TRACE";
  1405. method(mkcol) -> "MKCOL";
  1406. method(propfind) -> "PROPFIND";
  1407. method(proppatch) -> "PROPPATCH";
  1408. method(lock) -> "LOCK";
  1409. method(unlock) -> "UNLOCK";
  1410. method(move) -> "MOVE";
  1411. method(copy) -> "COPY";
  1412. method(connect) -> "CONNECT".
  1413. %% From RFC 2616
  1414. %%
  1415. % The chunked encoding modifies the body of a message in order to
  1416. % transfer it as a series of chunks, each with its own size indicator,
  1417. % followed by an OPTIONAL trailer containing entity-header
  1418. % fields. This allows dynamically produced content to be transferred
  1419. % along with the information necessary for the recipient to verify
  1420. % that it has received the full message.
  1421. % Chunked-Body = *chunk
  1422. % last-chunk
  1423. % trailer
  1424. % CRLF
  1425. % chunk = chunk-size [ chunk-extension ] CRLF
  1426. % chunk-data CRLF
  1427. % chunk-size = 1*HEX
  1428. % last-chunk = 1*("0") [ chunk-extension ] CRLF
  1429. % chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
  1430. % chunk-ext-name = token
  1431. % chunk-ext-val = token | quoted-string
  1432. % chunk-data = chunk-size(OCTET)
  1433. % trailer = *(entity-header CRLF)
  1434. % The chunk-size field is a string of hex digits indicating the size
  1435. % of the chunk. The chunked encoding is ended by any chunk whose size
  1436. % is zero, followed by the trailer, which is terminated by an empty
  1437. % line.
  1438. %%
  1439. %% The parsing implemented here discards all chunk extensions. It also
  1440. %% strips trailing spaces from the chunk size fields as Apache 1.3.27 was
  1441. %% sending them.
  1442. parse_chunk_header(ChunkHeader) ->
  1443. parse_chunk_header(ChunkHeader, []).
  1444. parse_chunk_header(<<$;, _/binary>>, Acc) ->
  1445. hexlist_to_integer(lists:reverse(Acc));
  1446. parse_chunk_header(<<H, T/binary>>, Acc) ->
  1447. case is_whitespace(H) of
  1448. true ->
  1449. parse_chunk_header(T, Acc);
  1450. false ->
  1451. parse_chunk_header(T, [H | Acc])
  1452. end;
  1453. parse_chunk_header(<<>>, Acc) ->
  1454. hexlist_to_integer(lists:reverse(Acc)).
  1455. is_whitespace($\s) -> true;
  1456. is_whitespace($\r) -> true;
  1457. is_whitespace($\n) -> true;
  1458. is_whitespace($\t) -> true;
  1459. is_whitespace(_) -> false.
  1460. send_async_headers(_ReqId, undefined, _, _State) ->
  1461. ok;
  1462. send_async_headers(ReqId, StreamTo, Give_raw_headers,
  1463. #state{status_line = Status_line, raw_headers = Raw_headers,
  1464. recvd_headers = Headers, http_status_code = StatCode,
  1465. cur_req = #request{options = Opts}
  1466. }) ->
  1467. {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
  1468. case Give_raw_headers of
  1469. false ->
  1470. catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
  1471. true ->
  1472. catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
  1473. end.
  1474. maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
  1475. Custom_headers = get_value(add_custom_headers, Opts, []),
  1476. Headers_1 = Headers ++ Custom_headers,
  1477. Raw_headers_1 = case Custom_headers of
  1478. [_ | _] when is_binary(Raw_headers) ->
  1479. Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
  1480. <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
  1481. _ ->
  1482. Raw_headers
  1483. end,
  1484. {Headers_1, Raw_headers_1}.
  1485. format_response_data(Resp_format, Body) ->
  1486. case Resp_format of
  1487. list when is_list(Body) ->
  1488. flatten(Body);
  1489. list when is_binary(Body) ->
  1490. binary_to_list(Body);
  1491. binary when is_list(Body) ->
  1492. list_to_binary(Body);
  1493. _ ->
  1494. %% This is to cater for sending messages such as
  1495. %% {chunk_start, _}, chunk_end etc
  1496. Body
  1497. end.
  1498. do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
  1499. Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
  1500. gen_server:reply(From, Msg_1),
  1501. dec_pipeline_counter(State);
  1502. do_reply(State, From, undefined, _, _, Msg) ->
  1503. gen_server:reply(From, Msg),
  1504. dec_pipeline_counter(State);
  1505. do_reply(#state{prev_req_id = Prev_req_id} = State,
  1506. _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
  1507. State_1 = dec_pipeline_counter(State),
  1508. case Body of
  1509. [] ->
  1510. ok;
  1511. _ ->
  1512. Body_1 = format_response_data(Resp_format, Body),
  1513. catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
  1514. end,
  1515. catch StreamTo ! {ibrowse_async_response_end, ReqId},
  1516. %% We don't want to delete the Req-id to Pid mapping straightaway
  1517. %% as the client may send a stream_next message just while we are
  1518. %% sending back this ibrowse_async_response_end message. If we
  1519. %% deleted this mapping straightaway, the caller will see a
  1520. %% {error, unknown_req_id} when it calls ibrowse:stream_next/1. To
  1521. %% get around this, we store the req id, and clear it after the
  1522. %% next request. If there are wierd combinations of stream,
  1523. %% stream_once and sync requests on the same connection, it will
  1524. %% take a while for the req_id-pid mapping to get cleared, but it
  1525. %% should do no harm.
  1526. ets:delete(ibrowse_stream, {req_id_pid, Prev_req_id}),
  1527. State_1#state{prev_req_id = ReqId};
  1528. do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
  1529. State_1 = dec_pipeline_counter(State),
  1530. Msg_1 = format_response_data(Resp_format, Msg),
  1531. catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
  1532. State_1.
  1533. do_interim_reply(undefined, _, _ReqId, _Msg) ->
  1534. ok;
  1535. do_interim_reply(StreamTo, Response_format, ReqId, Msg) ->
  1536. Msg_1 = format_response_data(Response_format, Msg),
  1537. catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}.
  1538. do_error_reply(#state{reqs = Reqs, tunnel_setup_queue = Tun_q} = State, Err) ->
  1539. ReqList = queue:to_list(Reqs),
  1540. lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
  1541. response_format = Resp_format}) ->
  1542. ets:delete(ibrowse_stream, {req_id_pid, ReqId}),
  1543. do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
  1544. end, ReqList),
  1545. lists:foreach(
  1546. fun({From, _Url, _Headers, _Method, _Body, _Options, _Timeout}) ->
  1547. do_reply(State, From, undefined, undefined, undefined, Err)
  1548. end, Tun_q).
  1549. fail_pipelined_requests(#state{reqs = Reqs, cur_req = CurReq} = State, Reply) ->
  1550. {_, Reqs_1} = queue:out(Reqs),
  1551. #request{from=From, stream_to=StreamTo, req_id=ReqId,
  1552. response_format = Resp_format} = CurReq,
  1553. do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
  1554. do_error_reply(State#state{reqs = Reqs_1}, previous_request_failed).
  1555. split_list_at(List, N) ->
  1556. split_list_at(List, N, []).
  1557. split_list_at([], _, Acc) ->
  1558. {lists:reverse(Acc), []};
  1559. split_list_at(List2, 0, List1) ->
  1560. {lists:reverse(List1), List2};
  1561. split_list_at([H | List2], N, List1) ->
  1562. split_list_at(List2, N-1, [H | List1]).
  1563. hexlist_to_integer(List) ->
  1564. hexlist_to_integer(lists:reverse(List), 1, 0).
  1565. hexlist_to_integer([H | T], Multiplier, Acc) ->
  1566. hexlist_to_integer(T, Multiplier*16, Multiplier*to_ascii(H) + Acc);
  1567. hexlist_to_integer([], _, Acc) ->
  1568. Acc.
  1569. to_ascii($A) -> 10;
  1570. to_ascii($a) -> 10;
  1571. to_ascii($B) -> 11;
  1572. to_ascii($b) -> 11;
  1573. to_ascii($C) -> 12;
  1574. to_ascii($c) -> 12;
  1575. to_ascii($D) -> 13;
  1576. to_ascii($d) -> 13;
  1577. to_ascii($E) -> 14;
  1578. to_ascii($e) -> 14;
  1579. to_ascii($F) -> 15;
  1580. to_ascii($f) -> 15;
  1581. to_ascii($1) -> 1;
  1582. to_ascii($2) -> 2;
  1583. to_ascii($3) -> 3;
  1584. to_ascii($4) -> 4;
  1585. to_ascii($5) -> 5;
  1586. to_ascii($6) -> 6;
  1587. to_ascii($7) -> 7;
  1588. to_ascii($8) -> 8;
  1589. to_ascii($9) -> 9;
  1590. to_ascii($0) -> 0.
  1591. cancel_timer(undefined) -> ok;
  1592. cancel_timer(Ref) -> erlang:cancel_timer(Ref).
  1593. cancel_timer(Ref, {eat_message, Msg}) ->
  1594. cancel_timer(Ref),
  1595. receive
  1596. Msg ->
  1597. ok
  1598. after 0 ->
  1599. ok
  1600. end.
  1601. make_req_id() ->
  1602. now().
  1603. to_lower(Str) ->
  1604. to_lower(Str, []).
  1605. to_lower([H|T], Acc) when H >= $A, H =< $Z ->
  1606. to_lower(T, [H+32|Acc]);
  1607. to_lower([H|T], Acc) ->
  1608. to_lower(T, [H|Acc]);
  1609. to_lower([], Acc) ->
  1610. lists:reverse(Acc).
  1611. shutting_down(#state{lb_ets_tid = undefined}) ->
  1612. ok;
  1613. shutting_down(#state{lb_ets_tid = Tid,
  1614. cur_pipeline_size = Sz}) ->
  1615. catch ets:delete(Tid, {Sz, self()}).
  1616. inc_pipeline_counter(#state{is_closing = true} = State) ->
  1617. State;
  1618. inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
  1619. State#state{cur_pipeline_size = Pipe_sz + 1}.
  1620. dec_pipeline_counter(#state{is_closing = true} = State) ->
  1621. State;
  1622. dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
  1623. State;
  1624. dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
  1625. lb_ets_tid = Tid} = State) ->
  1626. ets:delete(Tid, {Pipe_sz, self()}),
  1627. ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
  1628. State#state{cur_pipeline_size = Pipe_sz - 1}.
  1629. flatten([H | _] = L) when is_integer(H) ->
  1630. L;
  1631. flatten([H | _] = L) when is_list(H) ->
  1632. lists:flatten(L);
  1633. flatten([]) ->
  1634. [].
  1635. get_stream_chunk_size(Options) ->
  1636. case lists:keysearch(stream_chunk_size, 1, Options) of
  1637. {value, {_, V}} when V > 0 ->
  1638. V;
  1639. _ ->
  1640. ?DEFAULT_STREAM_CHUNK_SIZE
  1641. end.
  1642. set_inac_timer(State) ->
  1643. cancel_timer(State#state.inactivity_timer_ref),
  1644. set_inac_timer(State#state{inactivity_timer_ref = undefined},
  1645. get_inac_timeout(State)).
  1646. set_inac_timer(State, Timeout) when is_integer(Timeout) ->
  1647. Ref = erlang:send_after(Timeout, self(), timeout),
  1648. State#state{inactivity_timer_ref = Ref};
  1649. set_inac_timer(State, _) ->
  1650. State.
  1651. get_inac_timeout(#state{cur_req = #request{options = Opts}}) ->
  1652. get_value(inactivity_timeout, Opts, infinity);
  1653. get_inac_timeout(#state{cur_req = undefined}) ->
  1654. case ibrowse:get_config_value(inactivity_timeout, undefined) of
  1655. Val when is_integer(Val) ->
  1656. Val;
  1657. _ ->
  1658. case application:get_env(ibrowse, inactivity_timeout) of
  1659. {ok, Val} when is_integer(Val), Val > 0 ->
  1660. Val;
  1661. _ ->
  1662. 10000
  1663. end
  1664. end.
  1665. trace_request(Req) ->
  1666. case get(my_trace_flag) of
  1667. true ->
  1668. %%Avoid the binary operations if trace is not on...
  1669. NReq = to_binary(Req),
  1670. do_trace("Sending request: ~n"
  1671. "--- Request Begin ---~n~s~n"
  1672. "--- Request End ---~n", [NReq]);
  1673. _ -> ok
  1674. end.
  1675. trace_request_body(Body) ->
  1676. case get(my_trace_flag) of
  1677. true ->
  1678. %%Avoid the binary operations if trace is not on...
  1679. NBody = to_binary(Body),
  1680. case size(NBody) > 1024 of
  1681. true ->
  1682. ok;
  1683. false ->
  1684. do_trace("Sending request body: ~n"
  1685. "--- Request Body Begin ---~n~s~n"
  1686. "--- Request Body End ---~n", [NBody])
  1687. end;
  1688. false ->
  1689. ok
  1690. end.
  1691. to_integer(X) when is_list(X) -> list_to_integer(X);
  1692. to_integer(X) when is_integer(X) -> X.
  1693. to_binary(X) when is_list(X) -> list_to_binary(X);
  1694. to_binary(X) when is_binary(X) -> X.