You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

324 line
14 KiB

3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
3 年之前
  1. %% Copyright (c) 2016-2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_stream_h).
  15. -behavior(cowboy_stream).
  16. -export([init/3]).
  17. -export([data/4]).
  18. -export([info/3]).
  19. -export([terminate/3]).
  20. -export([early_error/5]).
  21. -export([request_process/3]).
  22. -export([resume/5]).
  23. -record(state, {
  24. next :: any(),
  25. ref = undefined :: ranch:ref(),
  26. pid = undefined :: pid(),
  27. expect = undefined :: undefined | continue,
  28. read_body_pid = undefined :: pid() | undefined,
  29. read_body_ref = undefined :: reference() | undefined,
  30. read_body_timer_ref = undefined :: reference() | undefined,
  31. read_body_length = 0 :: non_neg_integer() | infinity | auto,
  32. read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
  33. read_body_buffer = <<>> :: binary(),
  34. body_length = 0 :: non_neg_integer(),
  35. stream_body_pid = undefined :: pid() | undefined,
  36. stream_body_status = normal :: normal | blocking | blocked
  37. }).
  38. -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
  39. -> {[{spawn, pid(), timeout()}], #state{}}.
  40. init(StreamID, Req = #{ref := Ref}, Opts) ->
  41. Env = maps:get(env, Opts, #{}),
  42. Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
  43. Shutdown = maps:get(shutdown_timeout, Opts, 5000),
  44. Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
  45. Expect = expect(Req),
  46. {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
  47. {[{spawn, Pid, Shutdown} | Commands],
  48. #state{next = Next, ref = Ref, pid = Pid, expect = Expect}}.
  49. %% Ignore the expect header in HTTP/1.0.
  50. expect(#{version := 'HTTP/1.0'}) ->
  51. undefined;
  52. expect(Req) ->
  53. try cowboy_req:parse_header(<<"expect">>, Req) of
  54. Expect ->
  55. Expect
  56. catch _:_ ->
  57. undefined
  58. end.
  59. %% If we receive data and stream is waiting for data:
  60. %% If we accumulated enough data or IsFin=fin, send it.
  61. %% If we are in auto mode, send it and update flow control.
  62. %% If not, buffer it.
  63. %% If not, buffer it.
  64. %%
  65. %% We always reset the expect field when we receive data,
  66. %% since the client started sending the request body before
  67. %% we could send a 100 continue response.
  68. -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
  69. -> {cowboy_stream:commands(), State} when State :: #state{}.
  70. %% Stream isn't waiting for data.
  71. data(StreamID, IsFin, Data, State = #state{
  72. read_body_ref = undefined, read_body_buffer = Buffer, body_length = BodyLen}) ->
  73. do_data(StreamID, IsFin, Data, [], State#state{
  74. expect = undefined,
  75. read_body_is_fin = IsFin,
  76. read_body_buffer = <<Buffer/binary, Data/binary>>,
  77. body_length = BodyLen + byte_size(Data)
  78. });
  79. %% Stream is waiting for data using auto mode.
  80. %%
  81. %% There is no buffering done in auto mode.
  82. data(StreamID, IsFin, Data, State = #state{read_body_pid = Pid, read_body_ref = Ref,
  83. read_body_length = auto, body_length = BodyLen}) ->
  84. send_request_body(Pid, Ref, IsFin, BodyLen, Data),
  85. do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{
  86. read_body_ref = undefined,
  87. %% @todo This is wrong, it's missing byte_size(Data).
  88. body_length = BodyLen
  89. });
  90. %% Stream is waiting for data but we didn't receive enough to send yet.
  91. data(StreamID, IsFin = nofin, Data, State = #state{
  92. read_body_length = ReadLen, read_body_buffer = Buffer, body_length = BodyLen})
  93. when byte_size(Data) + byte_size(Buffer) < ReadLen ->
  94. do_data(StreamID, IsFin, Data, [], State#state{
  95. expect = undefined,
  96. read_body_buffer = <<Buffer/binary, Data/binary>>,
  97. body_length = BodyLen + byte_size(Data)
  98. });
  99. %% Stream is waiting for data and we received enough to send.
  100. data(StreamID, IsFin, Data, State = #state{read_body_pid = Pid, read_body_ref = Ref,
  101. read_body_timer_ref = TRef, read_body_buffer = Buffer, body_length = BodyLen0}) ->
  102. BodyLen = BodyLen0 + byte_size(Data),
  103. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
  104. send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
  105. do_data(StreamID, IsFin, Data, [], State#state{
  106. expect = undefined,
  107. read_body_ref = undefined,
  108. read_body_timer_ref = undefined,
  109. read_body_buffer = <<>>,
  110. body_length = BodyLen
  111. }).
  112. do_data(StreamID, IsFin, Data, Commands1, State = #state{next = Next0}) ->
  113. {Commands2, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
  114. {Commands1 ++ Commands2, State#state{next = Next}}.
  115. -spec info(cowboy_stream:streamid(), any(), State)
  116. -> {cowboy_stream:commands(), State} when State :: #state{}.
  117. info(StreamID, Info = {'EXIT', Pid, normal}, State = #state{pid = Pid}) ->
  118. do_info(StreamID, Info, [stop], State);
  119. info(StreamID, Info = {'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
  120. State = #state{pid = Pid}) ->
  121. Status = case Reason of
  122. timeout -> 408;
  123. payload_too_large -> 413;
  124. _ -> 400
  125. end,
  126. %% @todo Headers? Details in body? Log the crash? More stuff in debug only?
  127. do_info(StreamID, Info, [
  128. {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
  129. stop
  130. ], State);
  131. info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State = #state{ref = Ref, pid = Pid}) ->
  132. Commands0 = [{internal_error, Exit, 'Stream process crashed.'}],
  133. Commands = case Reason of
  134. normal -> Commands0;
  135. shutdown -> Commands0;
  136. {shutdown, _} -> Commands0;
  137. _ -> [{log, error,
  138. "Ranch listener ~p, connection process ~p, stream ~p "
  139. "had its request process ~p exit with reason "
  140. "~999999p and stacktrace ~999999p~n",
  141. [Ref, self(), StreamID, Pid, Reason, Stacktrace]}
  142. | Commands0]
  143. end,
  144. do_info(StreamID, Exit, [
  145. {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
  146. | Commands], State);
  147. %% Request body, auto mode, no body buffered.
  148. info(StreamID, Info = {read_body, Pid, Ref, auto, infinity}, State = #state{read_body_buffer = <<>>}) ->
  149. do_info(StreamID, Info, [], State#state{
  150. read_body_pid = Pid,
  151. read_body_ref = Ref,
  152. read_body_length = auto
  153. });
  154. %% Request body, auto mode, body buffered or complete.
  155. info(StreamID, Info = {read_body, Pid, Ref, auto, infinity}, State = #state{
  156. read_body_is_fin = IsFin, read_body_buffer = Buffer, body_length = BodyLen}) ->
  157. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  158. do_info(StreamID, Info, [{flow, byte_size(Buffer)}],
  159. State#state{read_body_buffer = <<>>});
  160. %% Request body, body buffered large enough or complete.
  161. %%
  162. %% We do not send a 100 continue response if the client
  163. %% already started sending the body.
  164. info(StreamID, Info = {read_body, Pid, Ref, Length, _}, State = #state{
  165. read_body_is_fin = IsFin, read_body_buffer = Buffer, body_length = BodyLen})
  166. when IsFin =:= fin; byte_size(Buffer) >= Length ->
  167. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  168. do_info(StreamID, Info, [], State#state{read_body_buffer = <<>>});
  169. %% Request body, not enough to send yet.
  170. info(StreamID, Info = {read_body, Pid, Ref, Length, Period}, State = #state{expect = Expect}) ->
  171. Commands = case Expect of
  172. continue -> [{inform, 100, #{}}, {flow, Length}];
  173. undefined -> [{flow, Length}]
  174. end,
  175. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
  176. do_info(StreamID, Info, Commands, State#state{
  177. read_body_pid = Pid,
  178. read_body_ref = Ref,
  179. read_body_timer_ref = TRef,
  180. read_body_length = Length
  181. });
  182. %% Request body reading timeout; send what we got.
  183. info(StreamID, Info = {read_body_timeout, Ref}, State = #state{read_body_pid = Pid, read_body_ref = Ref,
  184. read_body_is_fin = IsFin, read_body_buffer = Buffer, body_length = BodyLen}) ->
  185. send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
  186. do_info(StreamID, Info, [], State#state{
  187. read_body_ref = undefined,
  188. read_body_timer_ref = undefined,
  189. read_body_buffer = <<>>
  190. });
  191. info(StreamID, Info = {read_body_timeout, _}, State) ->
  192. do_info(StreamID, Info, [], State);
  193. %% Response.
  194. %%
  195. %% We reset the expect field when a 100 continue response
  196. %% is sent or when any final response is sent.
  197. info(StreamID, Inform = {inform, Status, _}, State0) ->
  198. State = case cow_http:status_to_integer(Status) of
  199. 100 -> State0#state{expect = undefined};
  200. _ -> State0
  201. end,
  202. do_info(StreamID, Inform, [Inform], State);
  203. info(StreamID, Response = {response, _, _, _}, State) ->
  204. do_info(StreamID, Response, [Response], State#state{expect = undefined});
  205. info(StreamID, Headers = {headers, _, _}, State) ->
  206. do_info(StreamID, Headers, [Headers], State#state{expect = undefined});
  207. %% Sending data involves the data message, the stream_buffer_full alarm
  208. %% and the connection_buffer_full alarm. We stop sending acks when an alarm is on.
  209. %%
  210. %% We only apply backpressure when the message includes a pid. Otherwise
  211. %% it is a message from Cowboy, or the user circumventing the backpressure.
  212. %%
  213. %% We currently do not support sending data from multiple processes concurrently.
  214. info(StreamID, Data = {data, _, _}, State) ->
  215. do_info(StreamID, Data, [Data], State);
  216. info(StreamID, Data0 = {data, Pid, _, _}, State0 = #state{stream_body_status = Status}) ->
  217. State = case Status of
  218. normal ->
  219. Pid ! {data_ack, self()},
  220. State0;
  221. blocking ->
  222. State0#state{stream_body_pid = Pid, stream_body_status = blocked};
  223. blocked ->
  224. State0
  225. end,
  226. Data = erlang:delete_element(2, Data0),
  227. do_info(StreamID, Data, [Data], State);
  228. info(StreamID, Alarm = {alarm, Name, on}, State0 = #state{stream_body_status = Status})
  229. when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
  230. State = case Status of
  231. normal -> State0#state{stream_body_status = blocking};
  232. _ -> State0
  233. end,
  234. do_info(StreamID, Alarm, [], State);
  235. info(StreamID, Alarm = {alarm, Name, off}, State = #state{stream_body_pid = Pid, stream_body_status = Status})
  236. when Name =:= connection_buffer_full; Name =:= stream_buffer_full ->
  237. _ = case Status of
  238. normal -> ok;
  239. blocking -> ok;
  240. blocked -> Pid ! {data_ack, self()}
  241. end,
  242. do_info(StreamID, Alarm, [], State#state{stream_body_pid = undefined, stream_body_status = normal});
  243. info(StreamID, Trailers = {trailers, _}, State) ->
  244. do_info(StreamID, Trailers, [Trailers], State);
  245. info(StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
  246. do_info(StreamID, Push, [Push], State);
  247. info(StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
  248. do_info(StreamID, SwitchProtocol, [SwitchProtocol], State#state{expect = undefined});
  249. %% Convert the set_options message to a command.
  250. info(StreamID, SetOptions = {set_options, _}, State) ->
  251. do_info(StreamID, SetOptions, [SetOptions], State);
  252. %% Unknown message, either stray or meant for a handler down the line.
  253. info(StreamID, Info, State) ->
  254. do_info(StreamID, Info, [], State).
  255. do_info(StreamID, Info, Commands1, State0 = #state{next = Next0}) ->
  256. {Commands2, Next} = cowboy_stream:info(StreamID, Info, Next0),
  257. {Commands1 ++ Commands2, State0#state{next = Next}}.
  258. -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
  259. terminate(StreamID, Reason, #state{next = Next}) ->
  260. cowboy_stream:terminate(StreamID, Reason, Next).
  261. -spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
  262. cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
  263. when Resp :: cowboy_stream:resp_command().
  264. early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
  265. cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
  266. send_request_body(Pid, Ref, nofin, _, Data) ->
  267. Pid ! {request_body, Ref, nofin, Data},
  268. ok;
  269. send_request_body(Pid, Ref, fin, BodyLen, Data) ->
  270. Pid ! {request_body, Ref, fin, BodyLen, Data},
  271. ok.
  272. %% Request process.
  273. %% We add the stacktrace to exit exceptions here in order
  274. %% to simplify the debugging of errors. The proc_lib library
  275. %% already adds the stacktrace to other types of exceptions.
  276. -spec request_process(cowboy_req:req(), cowboy_middleware:env(), [module()]) -> ok.
  277. request_process(Req, Env, Middlewares) ->
  278. try
  279. execute(Req, Env, Middlewares)
  280. catch
  281. exit:Reason = {shutdown, _}:Stacktrace ->
  282. erlang:raise(exit, Reason, Stacktrace);
  283. exit:Reason:Stacktrace when Reason =/= normal, Reason =/= shutdown ->
  284. erlang:raise(exit, {Reason, Stacktrace}, Stacktrace)
  285. end.
  286. execute(_, _, []) ->
  287. ok;
  288. execute(Req, Env, [Middleware | Tail]) ->
  289. case Middleware:execute(Req, Env) of
  290. {ok, Req2, Env2} ->
  291. execute(Req2, Env2, Tail);
  292. {suspend, Module, Function, Args} ->
  293. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
  294. {stop, _Req2} ->
  295. ok
  296. end.
  297. -spec resume(cowboy_middleware:env(), [module()], module(), atom(), [any()]) -> ok.
  298. resume(Env, Tail, Module, Function, Args) ->
  299. case apply(Module, Function, Args) of
  300. {ok, Req2, Env2} ->
  301. execute(Req2, Env2, Tail);
  302. {suspend, Module2, Function2, Args2} ->
  303. proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
  304. {stop, _Req2} ->
  305. ok
  306. end.