You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 line
17 KiB

5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
5 年之前
  1. -module(agSslAgencyIns).
  2. -include("agHttpCli.hrl").
  3. -include("erlArango.hrl").
  4. -compile(inline).
  5. -compile({inline_size, 128}).
  6. -export([
  7. %% 内部行为API
  8. init/1
  9. , handleMsg/3
  10. , terminate/3
  11. ]).
  12. -spec init(term()) -> no_return().
  13. init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
  14. ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
  15. self() ! ?miDoNetConnect,
  16. {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
  17. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
  18. handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
  19. #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
  20. #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIn = RequestsIn, status = Status} = CliState) ->
  21. case Socket of
  22. undefined ->
  23. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, no_socket}),
  24. {ok, SrvState, CliState};
  25. _ ->
  26. case BacklogNum >= BacklogSize of
  27. true ->
  28. ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
  29. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlog_full}),
  30. {ok, SrvState, CliState};
  31. _ ->
  32. case Status of
  33. leisure -> %% 空闲模式
  34. Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  35. case ssl:send(Socket, Request) of
  36. ok ->
  37. TimerRef =
  38. case OverTime of
  39. infinity ->
  40. undefined;
  41. _ ->
  42. erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
  43. end,
  44. {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
  45. {error, Reason} ->
  46. ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
  47. ssl:close(Socket),
  48. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socket_send_error, Reason}}),
  49. agAgencyUtils:dealClose(SrvState, CliState, {error, {socket_send_error, Reason}})
  50. end;
  51. _ ->
  52. agAgencyUtils:addQueue(RequestsIn, MiRequest),
  53. {ok, SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1}}
  54. end
  55. end
  56. end;
  57. handleMsg({ssl, Socket, Data},
  58. #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
  59. #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
  60. try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
  61. {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
  62. agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}),
  63. case agAgencyUtils:getQueue(RequestsOut + 1) of
  64. undefined ->
  65. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
  66. MiRequest ->
  67. dealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
  68. end;
  69. {ok, NewRecvState} ->
  70. {ok, SrvState, CliState#cliState{recvState = NewRecvState}};
  71. {error, Reason} ->
  72. ?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
  73. ssl:close(Socket),
  74. agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}})
  75. catch
  76. E:R:S ->
  77. ?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
  78. ssl:close(Socket),
  79. agAgencyUtils:dealClose(SrvState, CliState, {{error, agency_handledata_error}})
  80. end;
  81. handleMsg({timeout, TimerRef, waiting_over},
  82. #srvState{socket = Socket} = SrvState,
  83. #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
  84. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
  85. %% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱
  86. ssl:close(Socket),
  87. handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
  88. handleMsg({ssl_closed, Socket},
  89. #srvState{socket = Socket, serverName = ServerName} = SrvState,
  90. CliState) ->
  91. ?WARN(ServerName, "connection closed~n", []),
  92. ssl:close(Socket),
  93. agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
  94. handleMsg({ssl_error, Socket, Reason},
  95. #srvState{socket = Socket, serverName = ServerName} = SrvState,
  96. CliState) ->
  97. ?WARN(ServerName, "connection error: ~p~n", [Reason]),
  98. ssl:close(Socket),
  99. agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
  100. handleMsg(?miDoNetConnect,
  101. #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
  102. #cliState{requestsOut = RequestsOut} = CliState) ->
  103. case ?agBeamPool:get(PoolName) of
  104. #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
  105. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  106. {ok, Socket} ->
  107. NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
  108. %% 新建连接之后 需要重置之前的buff之类状态数据
  109. NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined},
  110. case agAgencyUtils:getQueue(RequestsOut + 1) of
  111. undefined ->
  112. {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, NewCliState};
  113. MiRequest ->
  114. dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, NewCliState)
  115. end;
  116. {error, _Reason} ->
  117. agAgencyUtils:reconnectTimer(SrvState, CliState)
  118. end;
  119. _Ret ->
  120. ?WARN(ServerName, "deal connect not found agBeamPool:get(~p) ret ~p is error ~n", [PoolName, _Ret])
  121. end;
  122. handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
  123. ?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
  124. {ok, SrvState, CliState}.
  125. -spec terminate(term(), srvState(), cliState()) -> ok.
  126. terminate(_Reason,
  127. #srvState{socket = Socket} = SrvState,
  128. CliState) ->
  129. {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
  130. ssl:close(Socket),
  131. agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
  132. ok.
  133. -spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
  134. overAllWork(SrvState, #cliState{requestsOut = RequestsOut, status = Status} = CliState) ->
  135. case Status of
  136. leisure ->
  137. case agAgencyUtils:getQueue(RequestsOut + 1) of
  138. undefined ->
  139. {ok, SrvState, CliState};
  140. MiRequest ->
  141. overDealQueueRequest(MiRequest, SrvState, CliState)
  142. end;
  143. _ ->
  144. overReceiveSslData(SrvState, CliState)
  145. end.
  146. -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
  147. overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
  148. #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
  149. #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) ->
  150. agAgencyUtils:delQueue(RequestsOut + 1),
  151. case erlang:system_time(millisecond) > OverTime of
  152. true ->
  153. %% 超时了
  154. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
  155. case agAgencyUtils:getQueue(RequestsOut + 2) of
  156. undefined ->
  157. {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}};
  158. MiRequest ->
  159. overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1})
  160. end;
  161. _ ->
  162. Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  163. case ssl:send(Socket, Request) of
  164. ok ->
  165. TimerRef =
  166. case OverTime of
  167. infinity ->
  168. undefined;
  169. _ ->
  170. erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
  171. end,
  172. overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}});
  173. {error, Reason} ->
  174. ?WARN(ServerName, ":send error: ~p~n", [Reason]),
  175. ssl:close(Socket),
  176. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
  177. agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error})
  178. end
  179. end.
  180. -spec overReceiveSslData(srvState(), cliState()) -> {ok, srvState(), cliState()}.
  181. overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
  182. #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIn = RequestsIn, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
  183. receive
  184. {ssl, Socket, Data} ->
  185. try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
  186. {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
  187. agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}),
  188. case agAgencyUtils:getQueue(RequestsOut + 1) of
  189. undefined ->
  190. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
  191. MiRequest ->
  192. overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
  193. end;
  194. {ok, NewRecvState} ->
  195. overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState});
  196. {error, Reason} ->
  197. ?WARN(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]),
  198. ssl:close(Socket),
  199. agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_data_error, Reason}})
  200. catch
  201. E:R:S ->
  202. ?WARN(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]),
  203. ssl:close(Socket),
  204. agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, handledata_error}})
  205. end;
  206. {timeout, TimerRef, waiting_over} ->
  207. case CurInfo of
  208. {_PidForm, _RequestId, TimerRef} ->
  209. ssl:close(Socket),
  210. agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
  211. case agAgencyUtils:getQueue(RequestsOut + 1) of
  212. undefined ->
  213. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
  214. MiRequest ->
  215. case ?agBeamPool:get(PoolName) of
  216. #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
  217. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  218. {ok, NewSocket} ->
  219. %% 新建连接之后 需要重置之前的buff之类状态数据
  220. NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined},
  221. overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState);
  222. {error, _Reason} ->
  223. agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}})
  224. end;
  225. _Ret ->
  226. agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}})
  227. end
  228. end;
  229. _ ->
  230. ?WARN(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
  231. overReceiveSslData(SrvState, CliState)
  232. end;
  233. {ssl_closed, Socket} ->
  234. ssl:close(Socket),
  235. agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
  236. {ssl_error, Socket, Reason} ->
  237. ssl:close(Socket),
  238. agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
  239. #miRequest{} = MiRequest ->
  240. agAgencyUtils:addQueue(RequestsIn, MiRequest),
  241. overReceiveSslData(SrvState, CliState#cliState{requestsIn = RequestsIn + 1, backlogNum = BacklogNum + 1});
  242. _Msg ->
  243. ?WARN(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]),
  244. overReceiveSslData(SrvState, CliState)
  245. end.
  246. -spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
  247. dealConnect(ServerName, HostName, Port, SocketOptions) ->
  248. case inet:getaddrs(HostName, inet) of
  249. {ok, IPList} ->
  250. Ip = agMiscUtils:randomElement(IPList),
  251. case ssl:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
  252. {ok, Socket} ->
  253. {ok, Socket};
  254. {error, Reason} ->
  255. ?WARN(ServerName, "connect error: ~p~n", [Reason]),
  256. {error, Reason}
  257. end;
  258. {error, Reason} ->
  259. ?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
  260. {error, Reason}
  261. end.
  262. -spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
  263. dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
  264. #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
  265. #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) ->
  266. agAgencyUtils:delQueue(RequestsOut + 1),
  267. case erlang:system_time(millisecond) > OverTime of
  268. true ->
  269. %% 超时了
  270. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
  271. case agAgencyUtils:getQueue(RequestsOut + 2) of
  272. undefined ->
  273. {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}};
  274. MiRequest ->
  275. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1})
  276. end;
  277. _ ->
  278. Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  279. case ssl:send(Socket, Request) of
  280. ok ->
  281. TimerRef =
  282. case OverTime of
  283. infinity ->
  284. undefined;
  285. _ ->
  286. erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
  287. end,
  288. {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
  289. {error, Reason} ->
  290. ?WARN(ServerName, ":send error: ~p~n", [Reason]),
  291. ssl:close(Socket),
  292. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
  293. agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error})
  294. end
  295. end.