arangodb erlang数据库驱动
Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

142 righe
7.1 KiB

  1. -module(agSslAgencyIns).
  2. -include("agVstCli.hrl").
  3. -include("eArango.hrl").
  4. -compile(inline).
  5. -compile({inline_size, 128}).
  6. -export([
  7. %% Inner Behavior API
  8. init/1
  9. , handleMsg/3
  10. , terminate/3
  11. ]).
  12. -spec init(term()) -> no_return().
  13. init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
  14. self() ! ?AgMDoDBConn,
  15. ReConnState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
  16. {ok, #srvState{poolName = PoolName, serverName = AgencyName, reConnState = ReConnState}, #cliState{backlogSize = BacklogSize}}.
  17. -spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
  18. handleMsg(#agReq{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
  19. #srvState{serverName = ServerName, dbName = DbName, socket = Socket, vstSize = VstSize} = SrvState,
  20. #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = CliState) ->
  21. case Socket of
  22. undefined ->
  23. agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, noSocket}),
  24. {ok, SrvState, CliState};
  25. _ ->
  26. case BacklogNum >= BacklogSize of
  27. true ->
  28. ?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
  29. agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, backlogFull}),
  30. {ok, SrvState, CliState};
  31. _ ->
  32. Request = agVstProto:request(IsSystem, MessageId, Method, DbName, Path, QueryPars, Headers, Body, VstSize),
  33. case ssl:send(Socket, Request) of
  34. ok ->
  35. TimerRef = case OverTime of
  36. infinity ->
  37. undefined;
  38. _ ->
  39. erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId, FromPid}, [{abs, true}])
  40. end,
  41. erlang:put(MessageId, {FromPid, TimerRef, 0, <<>>}),
  42. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}};
  43. {error, Reason} ->
  44. ?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]),
  45. ssl:close(Socket),
  46. agAgencyUtils:agencyReply(FromPid, undefined, MessageId, {error, {socketSendError, Reason}}),
  47. agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
  48. end
  49. end
  50. end;
  51. handleMsg({ssl, _Socket, DataBuffer}, SrvState,
  52. #cliState{revStatus = RevStatus, backlogNum = BacklogNum, messageId = OldMessageId, chunkIdx = OldChunkIdx, chunkSize = OldChunkSize, chunkBuffer = OldChunkBuffer} = CliState) ->
  53. case agVstProto:response(RevStatus, 0, OldMessageId, OldChunkIdx, OldChunkSize, OldChunkBuffer, DataBuffer) of
  54. {?AgUndef, DoneCnt} ->
  55. {ok, SrvState, CliState#cliState{revStatus = ?AgUndef, backlogNum = BacklogNum - DoneCnt, chunkBuffer = <<>>}};
  56. {?AgCBodyStart, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer} ->
  57. {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, messageId = MessageId, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer}};
  58. {?AgCBodyGoOn, DoneCnt, ChunkBuffer} ->
  59. {ok, SrvState, CliState#cliState{revStatus = ?AgCBody, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}};
  60. {?AgCHeader, DoneCnt, ChunkBuffer} ->
  61. {ok, SrvState, CliState#cliState{revStatus = ?AgCHeader, backlogNum = BacklogNum - DoneCnt, chunkBuffer = ChunkBuffer}}
  62. end;
  63. handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState,
  64. #cliState{backlogNum = BacklogNum} = CliState) ->
  65. MsgCache = erlang:get(MessageId),
  66. MsgPF = erlang:setelement(?AgPFIdx, MsgCache, timeOut),
  67. erlang:put(MessageId, MsgPF),
  68. agAgencyUtils:agencyReTimeout(FromPid, MessageId, {error, timeout}),
  69. {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
  70. handleMsg({ssl_closed, Socket},
  71. #srvState{socket = Socket, serverName = ServerName} = SrvState,
  72. CliState) ->
  73. ?AgWarn(ServerName, "connection closed~n", []),
  74. ssl:close(Socket),
  75. agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
  76. handleMsg({ssl_error, Socket, Reason},
  77. #srvState{socket = Socket, serverName = ServerName} = SrvState,
  78. CliState) ->
  79. ?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
  80. ssl:close(Socket),
  81. agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
  82. handleMsg(?AgMDoDBConn,
  83. #srvState{poolName = PoolName, serverName = ServerName, reConnState = _ReConnState} = SrvState,
  84. CliState) ->
  85. case ?agBeamPool:getv(PoolName) of
  86. #dbOpts{port = Port, hostname = HostName, dbName = DbName, user = User, password = Password, vstSize = VstSize} ->
  87. case ssl:connect(HostName, Port, ?AgDefSocketOpts, ?AgDefConnTimeout) of
  88. {ok, Socket} ->
  89. ssl:send(Socket, ?AgUpgradeInfo),
  90. AuthInfo = agVstProto:authInfo(User, Password),
  91. ssl:send(Socket, AuthInfo),
  92. case agVstCli:receiveSslData(#recvState{}, Socket) of
  93. {ok, MsgBin} ->
  94. case eVPack:decodeHeader(MsgBin) of
  95. [1, 2, 200, _] ->
  96. {ok, SrvState#srvState{dbName = DbName, socket = Socket, vstSize = VstSize}, CliState};
  97. _Err ->
  98. ?AgWarn(ServerName, "auth error: ~p~n", [_Err]),
  99. agAgencyUtils:reConnTimer(SrvState, CliState)
  100. end;
  101. {error, Reason} = Err ->
  102. ?AgWarn(ServerName, "recv auth error: ~p~n", [Reason]),
  103. Err
  104. end;
  105. {error, Reason} ->
  106. ?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
  107. agAgencyUtils:reConnTimer(SrvState, CliState)
  108. end;
  109. _Ret ->
  110. ?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
  111. end;
  112. handleMsg({'$gen_call', FromTag, '$SrvInfo'}, SrvState, CliState) ->
  113. {To, Tag} = FromTag,
  114. catch To ! {Tag, {erlang:get(), SrvState, CliState}},
  115. {ok, SrvState, CliState};
  116. handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
  117. ?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]),
  118. {ok, SrvState, CliState}.
  119. -spec terminate(term(), srvState(), cliState()) -> ok.
  120. terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) ->
  121. {ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState),
  122. ssl:close(Socket),
  123. agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
  124. ok.
  125. -spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}.
  126. waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->
  127. case BacklogNum > 0 of
  128. true ->
  129. receive
  130. Msg ->
  131. {ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState),
  132. waitAllReqOver(NewSrvState, NewCliState)
  133. end;
  134. _ ->
  135. {ok, SrvState, CliState}
  136. end.