Przeglądaj źródła

适配 vst 协议

master
SisMaker 4 lat temu
rodzic
commit
417430044d
16 zmienionych plików z 361 dodań i 1089 usunięć
  1. +45
    -54
      include/agVstCli.hrl
  2. +7
    -6
      include/eArango.hrl
  3. +1
    -1
      rebar.config
  4. +4
    -4
      src/agVstCli/agAgencyPoolMgrIns.erl
  5. +14
    -14
      src/agVstCli/agAgencyUtils.erl
  6. +0
    -281
      src/agVstCli/agHttpProtocol.erl
  7. +11
    -40
      src/agVstCli/agMiscUtils.erl
  8. +59
    -59
      src/agVstCli/agSslAgencyIns.erl
  9. +77
    -82
      src/agVstCli/agTcpAgencyIns.erl
  10. +0
    -77
      src/agVstCli/agVstAgencyExm.erl
  11. +0
    -400
      src/agVstCli/agVstAgencyIns.erl
  12. +22
    -22
      src/agVstCli/agVstCli.erl
  13. +115
    -0
      src/agVstCli/agVstProtocol.erl
  14. +4
    -47
      src/agVstCli/vst.erl
  15. +1
    -1
      src/eArango_sup.erl
  16. +1
    -1
      src/user_default.erl

include/agHttpCli.hrl → include/agVstCli.hrl Wyświetl plik

@ -5,44 +5,51 @@
-define(agBeamPool, agBeamPool).
-define(agBeamAgency, agBeamAgency).
-define(AgCUndef, 0). %% Wait One Chunk start
-define(AgCHeader, 1). %% Wait One Chunk header
-define(AgCBody, 2). %% Wait One Chunk Body
-define(AgCDone, 3). %% Wait One Chunk Receive Over
-define(AgMDone, 4). %% Wait One Message Over
%%
-define(DEFAULT_BASE_URL, <<"http://127.0.0.1:8529">>).
-define(DEFAULT_DBNAME, <<"_system">>).
-define(DEFAULT_USER, <<"root">>).
-define(DEFAULT_PASSWORD, <<"156736">>).
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_CONNECT_TIMEOUT, 5000).
-define(DEFAULT_POOL_SIZE, 16).
-define(DEFAULT_IS_RECONNECT, true).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_TIMEOUT, infinity).
-define(DEFAULT_PID, self()).
-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
-define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)).
-define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)).
-define(WARN(Tag, Format, Data), agMiscUtils:warnMsg(Tag, Format, Data)).
-define(miDoNetConnect, miDoNetConnect).
-define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>).
-define(AgDefDbName, <<"_system">>).
-define(AgDefUser, <<"root">>).
-define(AgDefPassWord, <<"156736">>).
-define(AgDefBacklogSize, 1024).
-define(AgDefConnTimeout, 5000).
-define(AgDefPoolSize, 16).
-define(AgDefIsReConn, true).
-define(AgDefReConnMin, 500).
-define(AgDefReConnMax, 120000).
-define(AgDefTimeout, infinity).
-define(AgDefPid, self()).
-define(AgDefSocketOpts, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
-define(AgGetListKV(Key, List), agMiscUtils:getListValue(Key, List, undefined)).
-define(AgGetListKV(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)).
-define(AgWarn(Tag, Format, Data), agMiscUtils:warnMsg(Tag, Format, Data)).
-define(AgMDoNetConn, mDoNetConn).
-record(miRequest, {
method :: method()
, path :: path()
, queryPars :: queryPars()
, headers :: headers()
, body :: body()
, requestId :: tuple()
, messageId :: pos_integer()
, fromPid :: pid()
, overTime = infinity :: timeout()
, isSystem = false :: boolean()
}).
-record(miRequestRet, {
requestId :: requestId(),
messageId :: messageId(),
reply :: term()
}).
-record(reconnectState, {
-record(reConnState, {
min :: non_neg_integer(),
max :: non_neg_integer() | infinity,
current :: non_neg_integer() | undefined
@ -54,32 +61,16 @@
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | ssl:sslsocket(),
timerRef :: undefined | reference()
}).
-record(recvState, {
stage = header :: header | body | done, %% http(tcp)
contentLength :: undefined | non_neg_integer() | chunked,
statusCode :: undefined | 100..505,
headers :: undefined | [binary()],
buffer = <<>> :: binary(),
body = <<>> :: binary()
}).
-record(cliState, {
isHeadMethod = false :: boolean(), %% <<"HEAD">>
%method = undefined :: undefined | method(),
requestsIns = [] :: list(),
requestsOuts = [] :: list(),
revStatus = ?AgCUndef :: pos_integer(),
backlogNum = 0 :: integer(),
backlogSize = 0 :: integer(),
status = leisure :: waiting | leisure,
curInfo = undefined :: tuple(),
recvState = undefined :: recvState() | undefined
buffer = <<>> :: binary()
}).
-record(dbOpts, {
@ -102,41 +93,41 @@
-type miRequest() :: #miRequest{}.
-type miRequestRet() :: #miRequestRet{}.
-type recvState() :: #recvState{}.
-type srvState() :: #srvState{}.
-type cliState() :: #cliState{}.
-type reconnectState() :: #reconnectState{}.
-type reconnectState() :: #reConnState{}.
-type poolName() :: atom().
-type poolNameOrSocket() :: atom() | socket().
-type serverName() :: atom().
-type protocol() :: ssl | tcp.
-type method() :: binary().
-type headers() :: [{iodata(), iodata()}].
-type queryPars() :: map().
-type headers() :: map().
-type body() :: iodata() | undefined.
-type path() :: binary().
-type host() :: binary().
-type hostName() :: string().
-type poolSize() :: pos_integer().
-type backlogSize() :: pos_integer() | infinity.
-type requestId() :: {serverName(), reference()}.
-type messageId() :: pos_integer().
-type socket() :: inet:socket() | ssl:sslsocket().
-type socketOpts() :: [gen_tcp:connect_option() | ssl:tls_client_option()].
-type error() :: {error, term()}.
-type dbCfg() ::
{baseUrl, binary()} |
{dbName, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()} |
{socketOpts, socketOpts()}.
{baseUrl, binary()} |
{dbName, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()} |
{socketOpts, socketOpts()}.
-type agencyCfg() ::
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()}.
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()}.
-type dbCfgs() :: [dbCfg()].
-type dbOpts() :: #dbOpts{}.

+ 7
- 6
include/eArango.hrl Wyświetl plik

@ -1,6 +1,7 @@
-define(AgGet, <<"GET ">>).
-define(AgPut, <<"PUT ">>).
-define(AgPost, <<"POST ">>).
-define(AgHead, <<"HEAD ">>).
-define(AgPatch, <<"PATCH ">>).
-define(AgDelete, <<"DELETE ">>).
-define(AgDelete, 0).
-define(AgGet, 1).
-define(AgPost, 2).
-define(AgPut, 3).
-define(AgHead, 4). %% (not used in VPP)
-define(AgPatch, 5).
-define(AgOptions, 6). %% (not used in VPP)

+ 1
- 1
rebar.config Wyświetl plik

@ -1,7 +1,7 @@
{erl_opts, [{i, "include"}]}.
{edoc_opts, [{preprocess, true}]}.
{deps, [
%%{eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}},
{eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}}
%%{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.5"}}}
%% {jsx, {git, "https://github.com/talentdeficit/jsx.git", {tag, "v3.0.0"}}}
]}.

+ 4
- 4
src/agVstCli/agAgencyPoolMgrIns.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(agAgencyPoolMgrIns).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
@ -48,7 +48,7 @@ handleMsg({'$gen_call', From, {miStopPool, Name}}, State) ->
gen_server:reply(From, ok),
{ok, State};
handleMsg(_Msg, State) ->
?WARN(?MODULE, "receive unexpected msg: ~p", [_Msg]),
?AgWarn(?MODULE, "receive unexpected msg: ~p", [_Msg]),
{ok, State}.
terminate(_Reason, _State) ->
@ -133,13 +133,13 @@ stopChildren([AgencyName | T]) ->
ok ->
ok;
{error, TerReason} ->
?WARN(agAgencyPoolMgrIns, ":terminate_child: ~p error reason: ~p ~n", [AgencyName, TerReason])
?AgWarn(agAgencyPoolMgrIns, ":terminate_child: ~p error reason: ~p ~n", [AgencyName, TerReason])
end,
case supervisor:delete_child(agAgencyPool_sup, AgencyName) of
ok ->
ok;
{error, DelReason} ->
?WARN(agAgencyPoolMgrIns, ":delete_child: ~p error reason: ~p ~n", [AgencyName, DelReason])
?AgWarn(agAgencyPoolMgrIns, ":delete_child: ~p error reason: ~p ~n", [AgencyName, DelReason])
end,
stopChildren(T);
stopChildren([]) ->

+ 14
- 14
src/agVstCli/agAgencyUtils.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(agAgencyUtils).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
@ -19,14 +19,14 @@
dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, curInfo = CurInfo} = ClientState, Reply) ->
agencyReply(CurInfo, Reply),
agencyReplyAll(RequestsOuts, RequestsIns, Reply),
reconnectTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, status = leisure, curInfo = undefined, recvState = undefined}).
reconnectTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}).
-spec reconnectTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}.
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->
#reconnectState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?miDoNetConnect),
#reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReconnectState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?AgMDoNetConn),
{ok, SrvState#srvState{reconnectState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
-spec agencyReply(term(), term()) -> ok.
@ -34,24 +34,24 @@ agencyReply({undefined, _RequestId, TimerRef}, _Reply) ->
agAgencyUtils:cancelTimer(TimerRef);
agencyReply({PidForm, RequestId, TimerRef}, Reply) ->
agAgencyUtils:cancelTimer(TimerRef),
catch PidForm ! #miRequestRet{requestId = RequestId, reply = Reply},
catch PidForm ! #miRequestRet{messageId = RequestId, reply = Reply},
ok;
agencyReply(undefined, _RequestRet) ->
ok.
-spec agencyReply(undefined | pid(), requestId(), undefined | reference(), term()) -> ok.
-spec agencyReply(undefined | pid(), messageId(), undefined | reference(), term()) -> ok.
agencyReply(undefined, _RequestId, TimerRef, _Reply) ->
agAgencyUtils:cancelTimer(TimerRef),
ok;
agencyReply(FormPid, RequestId, TimerRef, Reply) ->
agAgencyUtils:cancelTimer(TimerRef),
catch FormPid ! #miRequestRet{requestId = RequestId, reply = Reply},
catch FormPid ! #miRequestRet{messageId = RequestId, reply = Reply},
ok.
-spec agencyReplyAll(list(), list(), term()) -> ok.
agencyReplyAll(RequestsOuts, RequestsIns, Reply) ->
[agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- RequestsOuts],
[agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{requestId = RequestId, fromPid = FormPid} <- lists:reverse(RequestsIns)],
[agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{messageId = RequestId, fromPid = FormPid} <- RequestsOuts],
[agencyReply(FormPid, RequestId, undefined, Reply) || #miRequest{messageId = RequestId, fromPid = FormPid} <- lists:reverse(RequestsIns)],
ok.
-spec cancelTimer(undefined | reference()) -> ok.
@ -75,19 +75,19 @@ cancelTimer(TimerRef) ->
initReconnectState(IsReconnect, Min, Max) ->
case IsReconnect of
true ->
#reconnectState{min = Min, max = Max, current = Min};
#reConnState{min = Min, max = Max, current = Min};
false ->
undefined
end.
-spec resetReconnectState(undefined | reconnectState()) -> reconnectState() | undefined.
resetReconnectState(#reconnectState{min = Min} = ReconnectState) ->
ReconnectState#reconnectState{current = Min}.
resetReconnectState(#reConnState{min = Min} = ReconnectState) ->
ReconnectState#reConnState{current = Min}.
-spec updateReconnectState(reconnectState()) -> reconnectState().
updateReconnectState(#reconnectState{current = Current, max = Max} = ReconnectState) ->
updateReconnectState(#reConnState{current = Current, max = Max} = ReconnectState) ->
NewCurrent = Current + Current,
ReconnectState#reconnectState{current = minCur(NewCurrent, Max)}.
ReconnectState#reConnState{current = minCur(NewCurrent, Max)}.
minCur(A, B) when B >= A ->
A;

+ 0
- 281
src/agVstCli/agHttpProtocol.erl Wyświetl plik

@ -1,281 +0,0 @@
-module(agHttpProtocol).
-include("agHttpCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
headers/1
, request/7
, response/2
, response/5
]).
%% <<"Content-Type: application/json; charset=utf-8">>,
-spec request(boolean(), body(), method(), host(), binary(), path(), headers()) -> iolist().
request(true, undefined, Method, Host, _DbName, Path, Headers) ->
[
Method, <<"/_db/_system">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: 0\r\n">>,
spellHeaders(Headers), <<"\r\n">>
];
request(false, undefined, Method, Host, DbName, Path, Headers) ->
[
Method, DbName, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nContent-Type: application/json; charset=utf-8\r\nContent-Length: 0\r\n">>,
spellHeaders(Headers), <<"\r\n">>
];
request(false, Body, Method, Host, DbName, Path, Headers) ->
ContentLength = integer_to_binary(iolist_size(Body)),
NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers],
[
Method, DbName, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nContent-Type: application/json; charset=utf-8\r\n">>,
spellHeaders(NewHeaders), <<"\r\n">>, Body
];
request(true, Body, Method, Host, _DbName, Path, Headers) ->
ContentLength = integer_to_binary(iolist_size(Body)),
NewHeaders = [{<<"Content-Length">>, ContentLength} | Headers],
[
Method, <<"/_db/_system">>, Path, <<" HTTP/1.1\r\nHost: ">>, Host,
<<"\r\nContent-Type: application/json; charset=utf-8\r\n">>,
spellHeaders(NewHeaders), <<"\r\n">>, Body
].
-spec response(binary(), boolean()) -> {ok, recvState(), binary()} | error().
response(Data, IsHeadMethod) ->
response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data, IsHeadMethod).
-spec response(undefined | recvState(), binary:cp(), binary:cp(), binary(), boolean()) -> {ok, recvState()} | error().
response(undefined, Rn, RnRn, Data, IsHeadMethod) ->
case parseStatusLine(Data, Rn) of
{StatusCode, Rest} ->
case splitHeaders(Rest, Rn, RnRn) of
{undefined, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = undefined, body = Body}};
{0, Headers, Rest} ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}};
{chunked, Headers, Body} ->
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode < 200 andalso StatusCode >= 100) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Body}};
_ ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Body, IsHeadMethod)
end;
{ContentLength, Headers, Body} ->
BodySize = erlang:size(Body),
if
BodySize == ContentLength ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
BodySize > ContentLength ->
?WARN(agTcpAgencyIns, "11 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]),
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
true ->
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode < 200 andalso StatusCode >= 100) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
_ ->
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
end
end;
not_enough_data ->
{ok, #recvState{stage = header, body = Data}}
end;
not_enough_data ->
{ok, #recvState{stage = header, body = Data}};
{error, Reason} ->
{error, Reason}
end;
response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = Buffer} = RecvState, Rn, _RnRn, Data, _IsHeadMethod) ->
NewBuffer = <<Buffer/binary, Data/binary>>,
case parseChunks(NewBuffer, Rn, []) of
{ok, AddBody, _Rest} ->
LastBody = <<Body/binary, AddBody/binary>>,
{done, RecvState#recvState{stage = done, body = LastBody}};
{not_enough_data, AddBody, Rest} ->
NewBody = <<Body/binary, AddBody/binary>>,
{ok, RecvState#recvState{body = NewBody, buffer = Rest}};
{error, Reason} ->
{error, Reason}
end;
response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data, _IsHeadMethod) ->
CurData = <<Body/binary, Data/binary>>,
BodySize = erlang:size(CurData),
if
BodySize == ContentLength ->
{done, RecvState#recvState{stage = done, body = CurData}};
BodySize > ContentLength ->
?WARN(agTcpAgencyIns, "22 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]),
{done, #recvState{stage = done, body = CurData}};
true ->
{ok, RecvState#recvState{body = CurData}}
end;
response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data, IsHeadMethod) ->
CurBody = <<OldBody/binary, Data/binary>>,
case parseStatusLine(CurBody, Rn) of
{StatusCode, Rest} ->
case splitHeaders(Rest, Rn, RnRn) of
{undefined, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = undefined, body = Body}};
{0, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Body}};
{chunked, Headers, Rest} ->
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode < 200 andalso StatusCode >= 100) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = <<>>}};
_ ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Rest, IsHeadMethod)
end;
{ContentLength, Headers, Body} ->
BodySize = erlang:size(Body),
if
BodySize == ContentLength ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
BodySize > ContentLength ->
?WARN(agTcpAgencyIns, "33 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]),
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
true ->
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode < 200 andalso StatusCode >= 100) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
_ ->
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
end
end;
not_enough_data ->
{ok, #recvState{stage = header, body = CurBody}}
end;
not_enough_data ->
{ok, #recvState{stage = header, body = CurBody}};
{error, Reason} ->
{error, Reason}
end.
spellHeaders(Headers) ->
<<<<Key/binary, ": ", Value/binary, "\r\n">> || {Key, Value} <- Headers>>.
splitHeaders(Data, Rn, RnRn) ->
case binary:split(Data, RnRn) of
[Data] ->
not_enough_data;
[Headers, Body] ->
HeadersList = binary:split(Headers, Rn, [global]),
ContentLength = contentLength(HeadersList),
{ContentLength, Headers, Body}
end.
contentLength([]) ->
undefined;
contentLength([<<"Content-Length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
contentLength([<<"content-length: ", Rest/binary>> | _T]) ->
binary_to_integer(Rest);
contentLength([<<"Transfer-Encoding: chunked">> | _T]) ->
chunked;
contentLength([<<"transfer-encoding: chunked">> | _T]) ->
chunked;
contentLength([_ | T]) ->
contentLength(T).
parseStatusLine(Data, Rn) ->
case binary:split(Data, Rn) of
[Data] ->
not_enough_data;
[Line, Rest] ->
case parseStatusReason(Line) of
{ok, StatusCode} ->
{StatusCode, Rest};
{error, Reason} ->
{error, Reason}
end
end.
parseStatusReason(<<"HTTP/1.1 200 OK">>) ->
{ok, 200};
parseStatusReason(<<"HTTP/1.1 204 No Content">>) ->
{ok, 204};
parseStatusReason(<<"HTTP/1.1 301 Moved Permanently">>) ->
{ok, 301};
parseStatusReason(<<"HTTP/1.1 302 Found">>) ->
{ok, 302};
parseStatusReason(<<"HTTP/1.1 403 Forbidden">>) ->
{ok, 403};
parseStatusReason(<<"HTTP/1.1 404 Not Found">>) ->
{ok, 404};
parseStatusReason(<<"HTTP/1.1 500 Internal Server Error">>) ->
{ok, 500};
parseStatusReason(<<"HTTP/1.1 502 Bad Gateway">>) ->
{ok, 502};
parseStatusReason(<<"HTTP/1.1 ", N1, N2, N3, " ", _Reason/bits>>)
when $0 =< N1, N1 =< $9,
$0 =< N2, N2 =< $9,
$0 =< N3, N3 =< $9 ->
StatusCode = (N1 - $0) * 100 + (N2 - $0) * 10 + (N3 - $0),
{ok, StatusCode};
parseStatusReason(<<"HTTP/1.0 ", _/binary>>) ->
{error, unsupported_feature};
parseStatusReason(_) ->
{error, bad_request}.
parseChunks(Data, Rn, Acc) ->
case parseChunk(Data, Rn) of
done ->
{ok, iolist_to_binary(lists:reverse(Acc)), <<>>};
{ok, Body, Rest} ->
parseChunks(Rest, Rn, [Body | Acc]);
not_enough_data ->
{not_enough_data, iolist_to_binary(lists:reverse(Acc)), Data};
{error, Reason} ->
{error, Reason}
end.
parseChunk(Data, Rn) ->
case binary:split(Data, Rn) of
[Size, Rest] ->
case parseChunkSize(Size) of
undefined ->
{error, invalid_chunk_size};
0 ->
done;
HexSize ->
parseChunkBody(Rest, HexSize)
end;
[Data] ->
not_enough_data
end.
parseChunkBody(Data, Size) ->
case Data of
<<Body:Size/binary, "\r\n", Rest/binary>> ->
{ok, Body, Rest};
_ ->
not_enough_data
end.
parseChunkSize(Bin) ->
try
binary_to_integer(Bin, 16)
catch
error:badarg ->
undefined
end.
-spec headers(recvState()) -> {ok, headers()} | {error, invalidHeaders}.
headers(#recvState{headers = Headers}) ->
parseHeaders(Headers, []).
parseHeaders([], Acc) ->
{ok, lists:reverse(Acc)};
parseHeaders([Header | T], Acc) ->
case binary:split(Header, <<":">>) of
[Header] ->
{error, invalidHeaders};
[Key, <<>>] ->
parseHeaders(T, [{Key, undefined} | Acc]);
[Key, <<" ", Value/binary>>] ->
parseHeaders(T, [{Key, Value} | Acc])
end.

+ 11
- 40
src/agVstCli/agMiscUtils.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(agMiscUtils).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
@ -12,9 +12,6 @@
, getListValue/3
, randomElement/1
, toBinary/1
, spellQueryPars/1
, getHeaderValue/2
, lookHeader/2
]).
-spec parseUrl(binary()) -> dbOpts() | {error, invalidUrl}.
@ -51,22 +48,22 @@ parseUrl(Protocol, Rest) ->
-spec dbOpts(list()) -> dbOpts().
dbOpts(DbCfgs) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL),
DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME),
User = ?GET_FROM_LIST(user, DbCfgs, ?DEFAULT_USER),
Password = ?GET_FROM_LIST(password, DbCfgs, ?DEFAULT_PASSWORD),
PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE),
SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS),
BaseUrl = ?AgGetListKV(baseUrl, DbCfgs, ?AgDefBaseUrl),
DbName = ?AgGetListKV(dbName, DbCfgs, ?AgDefDbName),
User = ?AgGetListKV(user, DbCfgs, ?AgDefUser),
Password = ?AgGetListKV(password, DbCfgs, ?AgDefPassWord),
PoolSize = ?AgGetListKV(poolSize, DbCfgs, ?AgDefPoolSize),
SocketOpts = ?AgGetListKV(socketOpts, DbCfgs, ?AgDefSocketOpts),
DbOpts = agMiscUtils:parseUrl(BaseUrl),
UserPasswordBase64 = {<<"Authorization">>, <<"Basic ", (base64:encode(<<User/binary, ":", Password/binary>>))/binary>>},
DbOpts#dbOpts{dbName = <<"/_db/", DbName/binary>>, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
-spec agencyOpts(list()) -> agencyOpts().
agencyOpts(AgencyCfgs) ->
IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_IS_RECONNECT),
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?DEFAULT_BACKLOG_SIZE),
Min = ?GET_FROM_LIST(reconnectTimeMin, AgencyCfgs, ?DEFAULT_RECONNECT_MIN),
Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_RECONNECT_MAX),
IsReconnect = ?AgGetListKV(reconnect, AgencyCfgs, ?AgDefIsReConn),
BacklogSize = ?AgGetListKV(backlogSize, AgencyCfgs, ?AgDefBacklogSize),
Min = ?AgGetListKV(reconnectTimeMin, AgencyCfgs, ?AgDefReConnMin),
Max = ?AgGetListKV(reconnectTimeMax, AgencyCfgs, ?AgDefReConnMax),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}.
-spec getListValue(term(), list(), term()) -> term().
@ -99,29 +96,3 @@ toBinary([Tuple | PropList] = Value) when is_list(PropList) and is_tuple(Tuple)
lists:map(fun({K, V}) -> {toBinary(K), toBinary(V)} end, Value);
toBinary(Value) -> term_to_binary(Value).
-spec spellQueryPars(list()) -> binary().
spellQueryPars([]) ->
<<>>;
spellQueryPars([{Key, Value}]) ->
<<"?", (toBinary(Key))/binary, "=", (toBinary(Value))/binary>>;
spellQueryPars([{Key, Value} | Tail]) ->
FirstBinary = <<"?", (toBinary(Key))/binary, "=", (toBinary(Value))/binary>>,
TailBinary = <<<<"&", (toBinary(OtherKey))/binary, "=", (toBinary(OtherValue))/binary>> || {OtherKey, OtherValue} <- Tail>>,
<<FirstBinary/binary, TailBinary/binary>>.
-spec getHeaderValue(binary(), binary()) -> binary().
getHeaderValue(Header, HeaderBin) ->
HeadersList = binary:split(HeaderBin, <<"\r\n">>, [global]),
lookHeader(Header, HeadersList).
-spec lookHeader(binary, list()) -> binary().
lookHeader(_Header, []) ->
undefined;
lookHeader(Header, [H | T]) ->
case binary:split(H, <<": ">>) of
[Header, Value] ->
Value;
_ ->
lookHeader(Header, T)
end.

+ 59
- 59
src/agVstCli/agSslAgencyIns.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(agSslAgencyIns).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-include("eArango.hrl").
-compile(inline).
@ -15,13 +15,13 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
self() ! ?miDoNetConnect,
self() ! ?AgMDoNetConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) ->
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, revStatus = Status} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}),
@ -29,13 +29,13 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
_ ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -43,11 +43,11 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
@ -60,58 +60,58 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
handleMsg({ssl, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?WARN(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
?AgWarn(ServerName, "handle ssl data error: ~p ~p ~n", [Reason, CurInfo]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {sslDataError, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
?AgWarn(ServerName, "handle ssl data crash: ~p:~p~n~p~n ~p~n ", [E, R, S, CurInfo]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {{error, agencyHandledataError}})
end;
handleMsg({timeout, TimerRef, waiting_over},
handleMsg({timeout, TimerRef, mWaitingOver},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% ssl ssl收到该次超时数据
ssl:close(Socket),
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg(?AgMDoNetConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({ssl_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
?AgWarn(ServerName, "connection closed~n", []),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, ssl_closed});
handleMsg({ssl_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?miDoNetConnect,
handleMsg(?AgMDoNetConn,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of
@ -124,26 +124,26 @@ handleMsg(?miDoNetConnect,
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
@ -156,7 +156,7 @@ terminate(_Reason,
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) ->
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, revStatus = Status} = CliState) ->
case Status of
leisure ->
case RequestsOuts of
@ -180,7 +180,7 @@ overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = Reques
end.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
@ -204,7 +204,7 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers,
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -212,11 +212,11 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers,
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}});
overReceiveSslData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
@ -228,38 +228,38 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{ssl, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveSslData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?WARN(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]),
?AgWarn(overReceiveSslData, "handle ssl data error: ~p ~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {sslDataError, Reason}})
catch
E:R:S ->
?WARN(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]),
?AgWarn(overReceiveSslData, "handle ssl data crash: ~p:~p~n~p ~n ", [E, R, S]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, handledataError}})
end;
{timeout, TimerRef, waiting_over} ->
{timeout, TimerRef, mWaitingOver} ->
case CurInfo of
{_PidForm, _RequestId, TimerRef} ->
ssl:close(Socket),
@ -269,13 +269,13 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -288,7 +288,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -301,7 +301,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {new_ssl_connect_error_over, _Reason}})
end;
@ -313,7 +313,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -322,7 +322,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
end
end;
_ ->
?WARN(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
?AgWarn(overReceiveSslData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
overReceiveSslData(SrvState, CliState)
end;
{ssl_closed, Socket} ->
@ -334,7 +334,7 @@ overReceiveSslData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#miRequest{} = MiRequest ->
overReceiveSslData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
_Msg ->
?WARN(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]),
?AgWarn(overReceiveSslData, "receive unexpect msg: ~p~n", [_Msg]),
overReceiveSslData(SrvState, CliState)
end.
@ -343,20 +343,20 @@ dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case ssl:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
case ssl:connect(Ip, Port, SocketOptions, ?AgDefConnTimeout) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(ServerName, "connect error: ~p~n", [Reason]),
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
?AgWarn(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
@ -380,7 +380,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -388,11 +388,11 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})

+ 77
- 82
src/agVstCli/agTcpAgencyIns.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(agTcpAgencyIns).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-include("eArango.hrl").
-compile(inline).
@ -15,102 +15,97 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
self() ! ?AgMDoNetConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) ->
handleMsg(#miRequest{method = Method, path = Path, queryPars = QueryPars, headers = Headers, body = Body, messageId = MessageId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}),
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, noSocket}),
{ok, SrvState, CliState};
_ ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}),
?AgWarn(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end;
_ ->
{ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}}
Request = agVstProtocol:request(IsSystem, Method, DbName, Path, QueryPars, Headers, Body),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef = case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), {mWaitingOver, MessageId}, [{abs, true}])
end,
erlang:put(MessageId, {TimerRef, 0, <<>>}),
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum + 1}};
{error, Reason} ->
?AgWarn(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, MessageId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end
end
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
#srvState{serverName = ServerName, socket = Socket} = SrvState,
#cliState{revStatus = RevStatus, backlogNum = BacklogNum, buffer = Buffer} = CliState) ->
try agVstProtocol:response(RevStatus, Buffer, Data) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
?AgWarn(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
?AgWarn(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError})
end;
handleMsg({timeout, TimerRef, waiting_over},
handleMsg({timeout, TimerRef, mWaitingOver},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% tcp tcp收到该次超时数据
gen_tcp:close(Socket),
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg(?AgMDoNetConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
?AgWarn(ServerName, "connection closed~n", []),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?miDoNetConnect,
handleMsg(?AgMDoNetConn,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of
@ -123,26 +118,26 @@ handleMsg(?miDoNetConnect,
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
?AgWarn(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
?AgWarn(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
@ -155,7 +150,7 @@ terminate(_Reason,
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) ->
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, revStatus = Status} = CliState) ->
case Status of
leisure ->
case RequestsOuts of
@ -179,7 +174,7 @@ overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = Reques
end.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
@ -203,7 +198,7 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers,
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
@ -211,11 +206,11 @@ overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers,
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}});
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
@ -227,38 +222,38 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
?AgWarn(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
?AgWarn(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}})
end;
{timeout, TimerRef, waiting_over} ->
{timeout, TimerRef, mWaitingOver} ->
case CurInfo of
{_PidForm, _RequestId, TimerRef} ->
gen_tcp:close(Socket),
@ -268,13 +263,13 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, revStatus = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -287,7 +282,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -300,7 +295,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -312,7 +307,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, revStatus = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
@ -321,7 +316,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
end
end;
_ ->
?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
?AgWarn(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
overReceiveTcpData(SrvState, CliState)
end;
{tcp_closed, Socket} ->
@ -333,7 +328,7 @@ overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn =
#miRequest{} = MiRequest ->
overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
_Msg ->
?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]),
?AgWarn(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]),
overReceiveTcpData(SrvState, CliState)
end.
@ -342,20 +337,20 @@ dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
case gen_tcp:connect(Ip, Port, SocketOptions, ?AgDefConnTimeout) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(ServerName, "connect error: ~p~n", [Reason]),
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
?AgWarn(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
@ -379,7 +374,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
@ -387,11 +382,11 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
erlang:start_timer(OverTime, self(), mWaitingOver, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, revStatus = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
?AgWarn(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})

+ 0
- 77
src/agVstCli/agVstAgencyExm.erl Wyświetl plik

@ -1,77 +0,0 @@
-module(agVstAgencyExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case agTcpAgencyIns:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = agVstAgencyIns:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
agTcpAgencyIns:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 0
- 400
src/agVstCli/agVstAgencyIns.erl Wyświetl plik

@ -1,400 +0,0 @@
-module(agVstAgencyIns).
-include("agHttpCli.hrl").
-include("eArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
%% Inner Behavior API
init/1
, handleMsg/3
, terminate/3
]).
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}),
{ok, SrvState, CliState};
_ ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end;
_ ->
{ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}}
end
end
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError})
end;
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% tcp tcp收到该次超时数据
gen_tcp:close(Socket),
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
%% buff之类状态数据
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{socket = Socket} = SrvState,
CliState) ->
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) ->
case Status of
leisure ->
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs})
end;
_ ->
overReceiveTcpData(SrvState, CliState)
end.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.
-spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}})
end;
{timeout, TimerRef, waiting_over} ->
case CurInfo of
{_PidForm, _RequestId, TimerRef} ->
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
[MiRequest | Outs] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
_ ->
?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
overReceiveTcpData(SrvState, CliState)
end;
{tcp_closed, Socket} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
{tcp_error, Socket, Reason} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
#miRequest{} = MiRequest ->
overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
_Msg ->
?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]),
overReceiveTcpData(SrvState, CliState)
end.
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.

+ 22
- 22
src/agVstCli/agVstCli.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(agVstCli).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-include("eArango.hrl").
-compile(inline).
@ -48,19 +48,19 @@ callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
Ret
end.
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, requestId()} | {error, atom()}.
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?DEFAULT_TIMEOUT).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> {ok, requestId()} | {error, atom()}.
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?DEFAULT_TIMEOUT).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> {ok, requestId()} | {error, atom()}.
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), boolean(), timeout()) -> {ok, requestId()} | {error, atom()}.
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout) ->
OverTime =
case Timeout of
@ -78,20 +78,20 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
AgencyName ->
MonitorRef = erlang:monitor(process, AgencyName),
RequestId = {AgencyName, MonitorRef},
catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
{waitRRT, RequestId, MonitorRef}
end;
_ ->
case getCurDbInfo(PoolNameOrSocket) of
{DbName, UserPassWord, Host, Protocol} ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
ok ->
receiveTcpData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
{error, Reason} = Err ->
?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
?AgWarn(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket),
Err
end;
@ -100,7 +100,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
ok ->
receiveSslData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
{error, Reason} = Err ->
?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]),
?AgWarn(castAgency, ":ssl send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket),
Err
end
@ -110,10 +110,10 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
end
end.
-spec receiveRequestRet(requestId(), reference()) -> {StatusCode :: non_neg_integer(), Body :: binary(), Headers :: binary()} | {error, term()}.
-spec receiveRequestRet(messageId(), reference()) -> {StatusCode :: non_neg_integer(), Body :: binary(), Headers :: binary()} | {error, term()}.
receiveRequestRet(RequestId, MonitorRef) ->
receive
#miRequestRet{requestId = RequestId, reply = Reply} ->
#miRequestRet{messageId = RequestId, reply = Reply} ->
erlang:demonitor(MonitorRef),
case Reply of
{_StatusCode, Body, _Headers} ->
@ -134,7 +134,7 @@ receiveRequestRet(RequestId, MonitorRef) ->
receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receive
{tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
case Body of
<<>> ->
@ -145,12 +145,12 @@ receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
{error, Reason} ->
?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
?AgWarn(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {tcpDataError, Reason}}
catch
E:R:S ->
?WARN(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
?AgWarn(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
disConnectDb(Socket),
{error, handledataError}
end;
@ -166,7 +166,7 @@ receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receive
{ssl, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
case Body of
<<>> ->
@ -177,12 +177,12 @@ receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
{ok, NewRecvState} ->
receiveSslData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
{error, Reason} ->
?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
?AgWarn(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {sslDataError, Reason}}
catch
E:R:S ->
?WARN(receiveSslData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
?AgWarn(receiveSslData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
disConnectDb(Socket),
{error, handledataError}
end;
@ -222,26 +222,26 @@ connectDb(DbCfgs) ->
Ip = agMiscUtils:randomElement(IPList),
case Protocol of
tcp ->
case gen_tcp:connect(Ip, Port, SocketOpts, ?DEFAULT_CONNECT_TIMEOUT) of
case gen_tcp:connect(Ip, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?WARN(connectDb, "connect error: ~p~n", [Reason]),
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(Ip, Port, SocketOpts, ?DEFAULT_CONNECT_TIMEOUT) of
case ssl:connect(Ip, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?WARN(connectDb, "connect error: ~p~n", [Reason]),
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
end
end;
{error, Reason} = Err ->
?WARN(connectDb, "getaddrs error: ~p~n", [Reason]),
?AgWarn(connectDb, "getaddrs error: ~p~n", [Reason]),
Err
end.

+ 115
- 0
src/agVstCli/agVstProtocol.erl Wyświetl plik

@ -0,0 +1,115 @@
-module(agVstProtocol).
-include("agVstCli.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
request/7
, response/3
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, DbName, Method, Path, QueryPars, Headers]), Body];
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(undefined | recvState(), binary()) -> {ok, recvState()} | error().
response(?AgCUndef, Buffer, Data) ->
case Data of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
ChunkSize = Length - 24,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
{?AgMDone, LeftBin};
IsFirst == 1 ->
erlang:put(MessageId, {ChunkX, LeftBin});
true ->
case erlang:get(MessageId) of
{ChunkX, DataBin} ->
{?AgMDone, <<DataBin/binary, LeftBin/binary>>};
{SumChunk, DataBin} ->
erlang:put(MessageId, {SumChunk, <<DataBin/binary, LeftBin/binary>>}),
{?AgCBody, Data};
_ ->
throw(error_happen)
end
end;
ByteSize < ChunkSize ->
{?AgCBody, Data};
true ->
throw(error_bad_size)
end;
_ ->
{?AgCHeader, Data}
end;
response(?AgCHeader, Buffer, Data) ->
NewData = <<Buffer/binary, Data/binary>>,
case NewData of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
ChunkSize = Length - 24,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
{?AgMDone, LeftBin};
IsFirst == 1 ->
erlang:put(MessageId, {ChunkX, LeftBin});
true ->
case erlang:get(MessageId) of
{ChunkX, DataBin} ->
{?AgMDone, <<DataBin/binary, LeftBin/binary>>};
{SumChunk, DataBin} ->
erlang:put(MessageId, {SumChunk, <<DataBin/binary, LeftBin/binary>>}),
{?AgCBody, Data};
_ ->
throw(error_happen)
end
end;
ByteSize < ChunkSize ->
{?AgCBody, Data};
true ->
throw(error_bad_size)
end;
_ ->
{?AgCHeader, Data}
end;
response(?AgCUndef, Buffer, Data) ->
case Data of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, MessageLength:64/integer-little-unsigned, LeftBin/binary>> ->
ByteSize = erlang:byte_size(LeftBin),
ChunkSize = Length - 24,
if
ByteSize == ChunkSize ->
if
IsFirst == ChunkX ->
{?AgMDone, LeftBin};
IsFirst == 1 ->
erlang:put(MessageId, {ChunkX, LeftBin});
true ->
case erlang:get(MessageId) of
{ChunkX, DataBin} ->
{?AgMDone, <<DataBin/binary, LeftBin/binary>>};
{SumChunk, DataBin} ->
erlang:put(MessageId, {SumChunk, <<DataBin/binary, LeftBin/binary>>}),
{?AgCBody, Data};
_ ->
throw(error_happen)
end
end;
ByteSize < ChunkSize ->
{?AgCBody, Data};
true ->
throw(error_bad_size)
end;
_ ->
{?AgCHeader, Data}
end;

+ 4
- 47
src/agVstCli/vst.erl Wyświetl plik

@ -7,46 +7,6 @@
vst_maxsize/0
]).
assign_map(_acc@1, _key@1, _value@1) ->
case _acc@1 of
#{_key@1 := _} -> _acc@1;
#{} -> _acc@1#{_key@1 => _value@1};
_ -> #{_key@1 => _value@1}
end.
assign_split([<<>>, _rest@1], _value@1, _acc@1, _pattern@1) ->
_parts@1 = binary:split(_rest@1, _pattern@1),
case _acc@1 of
[_ | _] ->
[assign_split(_parts@1, _value@1, none, _pattern@1) | _acc@1];
none ->
[assign_split(_parts@1, _value@1, none, _pattern@1)];
_ -> _acc@1
end;
assign_split([_key@1, _rest@1], _value@1, _acc@1, _pattern@1) ->
_parts@1 = binary:split(_rest@1, _pattern@1),
case _acc@1 of
#{_key@1 := _current@1} ->
_acc@1#{_key@1 =>
assign_split(_parts@1, _value@1, _current@1, _pattern@1)};
#{} ->
_acc@1#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)};
_ ->
#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)}
end;
assign_split([<<>>], nil, _acc@1, __pattern@1) ->
case _acc@1 of
[_ | _] -> _acc@1;
_ -> []
end;
assign_split([<<>>], _value@1, _acc@1, __pattern@1) ->
case _acc@1 of
[_ | _] -> [_value@1 | _acc@1];
none -> [_value@1];
_ -> _acc@1
end;
assign_split([_key@1], _value@1, _acc@1, __pattern@1) ->
assign_map(_acc@1, _key@1, _value@1).
authorize(#{socket := _socket@1, username := _un@1, password := _pw@1} = _state@1) ->
case eVPack:encode([1, 1000, <<"plain">>, _un@1, _pw@1]) of
@ -179,10 +139,8 @@ body_from(_body@1) -> _body@1.
build_stream(_message@1) ->
case chunk_every(_message@1, 30696) of
[_first_chunk@1 | _rest_chunks@1] ->
_n_chunks@1 = erlang:length([_first_chunk@1
| _rest_chunks@1]),
_msg_length@1 = erlang:byte_size(_message@1) +
_n_chunks@1 * 24,
_n_chunks@1 = erlang:length([_first_chunk@1 | _rest_chunks@1]),
_msg_length@1 = erlang:byte_size(_message@1) + _n_chunks@1 * 24,
_rest_chunks@2 =
lists:reverse(lists:flodl(
fun(_n@1, _@1) ->
@ -273,14 +231,13 @@ port_for({tcp, __host@1, _port@1}) -> _port@1.
prepend_chunk(_chunk@1, _chunk_n@1, _is_first@1,
_msg_id@1, _msg_length@1) ->
<<(24 + erlang:byte_size(_chunk@1)):32/integer-little,
(binary:decode_unsigned(<<_chunk_n@1:31/integer,
_is_first@1:1/integer>>,
little)):32/integer,
(binary:decode_unsigned(<<_chunk_n@1:31/integer, _is_first@1:1/integer-little>>)):32/integer,
_msg_id@1:64/integer-little,
_msg_length@1:64/integer-little, _chunk@1/binary>>.
query_for(nil) -> #{}.
recv_chunk({_mod@1, _port@1}, _chunk_length@1) ->
_mod@1:recv(_port@1, _chunk_length@1 - 24).

+ 1
- 1
src/eArango_sup.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(eArango_sup).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-include("eArango.hrl").
-behaviour(supervisor).

+ 1
- 1
src/user_default.erl Wyświetl plik

@ -1,5 +1,5 @@
-module(user_default).
-include("agHttpCli.hrl").
-include("agVstCli.hrl").
-compile([export_all, nowarn_export_all]).

Ładowanie…
Anuluj
Zapisz