You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

400 lines
24 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. -module(agTcpAgencyIns).
  2. -include("agHttpCli.hrl").
  3. -include("erlArango.hrl").
  4. -compile(inline).
  5. -compile({inline_size, 128}).
  6. -export([
  7. %% 内部行为API
  8. init/1
  9. , handleMsg/3
  10. , terminate/3
  11. ]).
  12. -spec init(term()) -> no_return().
  13. init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
  14. ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
  15. self() ! ?miDoNetConnect,
  16. {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
  17. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
  18. handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
  19. #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
  20. #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) ->
  21. case Socket of
  22. undefined ->
  23. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}),
  24. {ok, SrvState, CliState};
  25. _ ->
  26. case BacklogNum > BacklogSize of
  27. true ->
  28. ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
  29. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}),
  30. {ok, SrvState, CliState};
  31. _ ->
  32. case Status of
  33. leisure -> %% 空闲模式
  34. Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  35. case gen_tcp:send(Socket, Request) of
  36. ok ->
  37. TimerRef =
  38. case OverTime of
  39. infinity ->
  40. undefined;
  41. _ ->
  42. erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
  43. end,
  44. {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
  45. {error, Reason} ->
  46. ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
  47. gen_tcp:close(Socket),
  48. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}),
  49. agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
  50. end;
  51. _ ->
  52. {ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}}
  53. end
  54. end
  55. end;
  56. handleMsg({tcp, Socket, Data},
  57. #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
  58. #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
  59. try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
  60. {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
  61. agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}),
  62. case RequestsOuts of
  63. [] ->
  64. case RequestsIns of
  65. [] ->
  66. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
  67. [MiRequest] ->
  68. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
  69. MiRLists ->
  70. [MiRequest | Outs] = lists:reverse(MiRLists),
  71. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
  72. end;
  73. [MiRequest] ->
  74. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
  75. [MiRequest | Outs] ->
  76. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
  77. end;
  78. {ok, NewRecvState} ->
  79. {ok, SrvState, CliState#cliState{recvState = NewRecvState}};
  80. {error, Reason} ->
  81. ?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
  82. gen_tcp:close(Socket),
  83. agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
  84. catch
  85. E:R:S ->
  86. ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
  87. gen_tcp:close(Socket),
  88. agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError})
  89. end;
  90. handleMsg({timeout, TimerRef, waiting_over},
  91. #srvState{socket = Socket} = SrvState,
  92. #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
  93. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
  94. %% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱
  95. gen_tcp:close(Socket),
  96. handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
  97. handleMsg({tcp_closed, Socket},
  98. #srvState{socket = Socket, serverName = ServerName} = SrvState,
  99. CliState) ->
  100. ?WARN(ServerName, "connection closed~n", []),
  101. gen_tcp:close(Socket),
  102. agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
  103. handleMsg({tcp_error, Socket, Reason},
  104. #srvState{socket = Socket, serverName = ServerName} = SrvState,
  105. CliState) ->
  106. ?WARN(ServerName, "connection error: ~p~n", [Reason]),
  107. gen_tcp:close(Socket),
  108. agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
  109. handleMsg(?miDoNetConnect,
  110. #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
  111. #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
  112. case ?agBeamPool:getv(PoolName) of
  113. #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
  114. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  115. {ok, Socket} ->
  116. NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
  117. %% 新建连接之后 需要重置之前的buff之类状态数据
  118. case RequestsOuts of
  119. [] ->
  120. case RequestsIns of
  121. [] ->
  122. {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}};
  123. [MiRequest] ->
  124. dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
  125. MiRLists ->
  126. [MiRequest | Outs] = lists:reverse(MiRLists),
  127. dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
  128. end;
  129. [MiRequest] ->
  130. dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
  131. [MiRequest | Outs] ->
  132. dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
  133. end;
  134. {error, _Reason} ->
  135. agAgencyUtils:reconnectTimer(SrvState, CliState)
  136. end;
  137. _Ret ->
  138. ?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
  139. end;
  140. handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
  141. ?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
  142. {ok, SrvState, CliState}.
  143. -spec terminate(term(), srvState(), cliState()) -> ok.
  144. terminate(_Reason,
  145. #srvState{socket = Socket} = SrvState,
  146. CliState) ->
  147. {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
  148. gen_tcp:close(Socket),
  149. agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
  150. ok.
  151. -spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
  152. overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) ->
  153. case Status of
  154. leisure ->
  155. case RequestsOuts of
  156. [] ->
  157. case RequestsIns of
  158. [] ->
  159. {ok, SrvState, CliState};
  160. [MiRequest] ->
  161. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []});
  162. MiRLists ->
  163. [MiRequest | Outs] = lists:reverse(MiRLists),
  164. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs})
  165. end;
  166. [MiRequest] ->
  167. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []});
  168. [MiRequest | Outs] ->
  169. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs})
  170. end;
  171. _ ->
  172. overReceiveTcpData(SrvState, CliState)
  173. end.
  174. -spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
  175. overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
  176. #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
  177. #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
  178. case erlang:system_time(millisecond) > OverTime of
  179. true ->
  180. %% 超时了
  181. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
  182. case RequestsOuts of
  183. [] ->
  184. case RequestsIns of
  185. [] ->
  186. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
  187. [MiRequest] ->
  188. overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
  189. MiRLists ->
  190. [MiRequest | Outs] = lists:reverse(MiRLists),
  191. overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
  192. end;
  193. [MiRequest] ->
  194. overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
  195. [MiRequest | Outs] ->
  196. overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
  197. end;
  198. _ ->
  199. Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  200. case gen_tcp:send(Socket, Request) of
  201. ok ->
  202. TimerRef =
  203. case OverTime of
  204. infinity ->
  205. undefined;
  206. _ ->
  207. erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
  208. end,
  209. overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}});
  210. {error, Reason} ->
  211. ?WARN(ServerName, ":send error: ~p~n", [Reason]),
  212. gen_tcp:close(Socket),
  213. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
  214. agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
  215. end
  216. end.
  217. -spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}.
  218. overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
  219. #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
  220. receive
  221. {tcp, Socket, Data} ->
  222. try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
  223. {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
  224. agAgencyUtils:agencyReply(CurInfo, {ok, Body, StatusCode, Headers}),
  225. case RequestsOuts of
  226. [] ->
  227. case RequestsIns of
  228. [] ->
  229. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
  230. [MiRequest] ->
  231. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
  232. MiRLists ->
  233. [MiRequest | Outs] = lists:reverse(MiRLists),
  234. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
  235. end;
  236. [MiRequest] ->
  237. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
  238. [MiRequest | Outs] ->
  239. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
  240. end;
  241. {ok, NewRecvState} ->
  242. overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState});
  243. {error, Reason} ->
  244. ?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
  245. gen_tcp:close(Socket),
  246. agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
  247. catch
  248. E:R:S ->
  249. ?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
  250. gen_tcp:close(Socket),
  251. agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}})
  252. end;
  253. {timeout, TimerRef, waiting_over} ->
  254. case CurInfo of
  255. {_PidForm, _RequestId, TimerRef} ->
  256. gen_tcp:close(Socket),
  257. agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
  258. case RequestsOuts of
  259. [] ->
  260. case RequestsIns of
  261. [] ->
  262. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
  263. [MiRequest] ->
  264. case ?agBeamPool:getv(PoolName) of
  265. #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
  266. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  267. {ok, NewSocket} ->
  268. overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
  269. {error, _Reason} ->
  270. agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
  271. end;
  272. _Ret ->
  273. agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
  274. end;
  275. MiRLists ->
  276. [MiRequest | Outs] = lists:reverse(MiRLists),
  277. case ?agBeamPool:getv(PoolName) of
  278. #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
  279. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  280. {ok, NewSocket} ->
  281. overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
  282. {error, _Reason} ->
  283. agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
  284. end;
  285. _Ret ->
  286. agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
  287. end
  288. end;
  289. [MiRequest] ->
  290. case ?agBeamPool:getv(PoolName) of
  291. #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
  292. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  293. {ok, NewSocket} ->
  294. overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
  295. {error, _Reason} ->
  296. agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
  297. end;
  298. _Ret ->
  299. agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
  300. end;
  301. [MiRequest | Outs] ->
  302. case ?agBeamPool:getv(PoolName) of
  303. #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
  304. case dealConnect(ServerName, HostName, Port, SocketOpts) of
  305. {ok, NewSocket} ->
  306. overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
  307. {error, _Reason} ->
  308. agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
  309. end;
  310. _Ret ->
  311. agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
  312. end
  313. end;
  314. _ ->
  315. ?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
  316. overReceiveTcpData(SrvState, CliState)
  317. end;
  318. {tcp_closed, Socket} ->
  319. gen_tcp:close(Socket),
  320. agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
  321. {tcp_error, Socket, Reason} ->
  322. gen_tcp:close(Socket),
  323. agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
  324. #miRequest{} = MiRequest ->
  325. overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
  326. _Msg ->
  327. ?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]),
  328. overReceiveTcpData(SrvState, CliState)
  329. end.
  330. -spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
  331. dealConnect(ServerName, HostName, Port, SocketOptions) ->
  332. case inet:getaddrs(HostName, inet) of
  333. {ok, IPList} ->
  334. Ip = agMiscUtils:randomElement(IPList),
  335. case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
  336. {ok, Socket} ->
  337. {ok, Socket};
  338. {error, Reason} ->
  339. ?WARN(ServerName, "connect error: ~p~n", [Reason]),
  340. {error, Reason}
  341. end;
  342. {error, Reason} ->
  343. ?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
  344. {error, Reason}
  345. end.
  346. -spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
  347. dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
  348. #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
  349. #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
  350. case erlang:system_time(millisecond) > OverTime of
  351. true ->
  352. %% 超时了
  353. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
  354. case RequestsOuts of
  355. [] ->
  356. case RequestsIns of
  357. [] ->
  358. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
  359. [MiRequest] ->
  360. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
  361. MiRLists ->
  362. [MiRequest | Outs] = lists:reverse(MiRLists),
  363. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
  364. end;
  365. [MiRequest] ->
  366. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
  367. [MiRequest | Outs] ->
  368. dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
  369. end;
  370. _ ->
  371. Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  372. case gen_tcp:send(Socket, Request) of
  373. ok ->
  374. TimerRef =
  375. case OverTime of
  376. infinity ->
  377. undefined;
  378. _ ->
  379. erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
  380. end,
  381. {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
  382. {error, Reason} ->
  383. ?WARN(ServerName, ":send error: ~p~n", [Reason]),
  384. gen_tcp:close(Socket),
  385. agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
  386. agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
  387. end
  388. end.