浏览代码

适配 vst 协议

master
SisMaker 4 年前
父节点
当前提交
7fc78ca9ff
共有 14 个文件被更改,包括 244 次插入365 次删除
  1. +1
    -1
      README.md
  2. +20
    -65
      include/agVstCli.hrl
  3. +1
    -1
      src/agApi/agMiscFuns.erl
  4. +31
    -34
      src/agVstCli/agAgencyUtils.erl
  5. +4
    -3
      src/agVstCli/agMiscUtils.erl
  6. +0
    -0
      src/agVstCli/agSslAgencyExm.erl1
  7. +3
    -3
      src/agVstCli/agSslAgencyIns.erl1
  8. +33
    -12
      src/agVstCli/agTcpAgencyIns.erl
  9. +106
    -99
      src/agVstCli/agVstCli.erl
  10. +14
    -21
      src/agVstCli/agVstProtoPl.erl
  11. +26
    -121
      src/agVstCli/agVstProtoSp.erl
  12. +0
    -0
      src/agVstCli/vst.erl1
  13. +2
    -2
      src/eArango.app.src
  14. +3
    -3
      src/user_default.erl

+ 1
- 1
README.md 查看文件

@ -33,7 +33,7 @@
agMgrDb:curDbInfo(Socket).
Connection pooling mode
application:ensure_all_started(erlArango). %% start app
application:ensure_all_started(eArango). %% start app
agVstCli:startPool(poolName, [], []). %% start pool
%% Then you can then invoke various apis using poolName as the first argument
agMgrDb:curDbInfo(poolName).

+ 20
- 65
include/agVstCli.hrl 查看文件

@ -10,21 +10,25 @@
-define(AgCBody, 2). %% Wait One Chunk Body
-define(AgCBodyStart, 3). %% Ret Start Wait One Chunk Body
-define(AgCBodyGoOn, 4). %% Ret Go On Wait One Chunk Body
-define(AgCDone, 5). %% receve one Chunk done
-define(AgMDone, 6). %% receve one message done
%% IMY-todo
%% pidFrom pid() to reply; undefiend discard; waitSend requester来获取
%% pidFrom pid() to reply; undefiend discard; timeOut discard
-record(msgIdCache, {pidFrom, timerRef, chunkCnt, msgBuffer}).
-define(AgMBIdx, 4).
-define(AgCCIdx, 3).
-define(AgTRIdx, 2).
-define(AgPFIdx, 1).
-define(AgHeaderSize, 24).
%%
-define(AgDefBaseUrl, <<"http://127.0.0.1:8529">>).
-define(AgDefBaseUrl, <<"http://192.168.0.88:8529">>).
-define(AgDefDbName, <<"_system">>).
-define(AgDefUser, <<"root">>).
-define(AgDefPassWord, <<"156736">>).
-define(AgDefBacklogSize, 1024).
-define(AgDefConnTimeout, 5000).
-define(AgDefPoolSize, 16).
@ -32,6 +36,8 @@
-define(AgDefReConnMin, 500).
-define(AgDefReConnMax, 120000).
-define(AgDefTimeout, infinity).
-define(AgDefVstSize, 3145728).
-define(AgDefAgencySlg, poll). %% bind rand poll
-define(AgDefPid, self()).
-define(AgDefSocketOpts, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
@ -39,7 +45,8 @@
-define(AgGetListKV(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)).
-define(AgWarn(Tag, Format, Data), agMiscUtils:warnMsg(Tag, Format, Data)).
-define(AgMDoNetConn, mDoNetConn).
-define(AgMDoDBConn, mDoDBConn).
-define(AgUpgradeInfo, <<"VST/1.1\r\n\r\n">>).
-record(agReq, {
method :: method()
@ -67,8 +74,6 @@
-record(srvState, {
poolName :: poolName(),
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
reConnState :: undefined | reConnState(),
socket :: undefined | ssl:sslsocket(),
@ -88,6 +93,7 @@
-record(recvState, {
revStatus = ?AgUndef :: pos_integer(),
messageId = 0 :: pos_integer(),
chunkCnt = -1 :: integer(),
msgBuffer = <<>> :: binary(),
chunkIdx = 0 :: pos_integer(),
chunkSize = 0 :: pos_integer(),
@ -101,12 +107,15 @@
dbName :: binary(),
protocol :: protocol(),
poolSize :: poolSize(),
userPassword :: binary(),
user :: binary(),
password :: binary(),
socketOpts :: socketOpts()
}).
-record(agencyOpts, {
reconnect :: boolean(),
vstSize :: pos_integer(),
agencySlg :: agencySlg(),
backlogSize :: backlogSize(),
reConnTimeMin :: pos_integer(),
reConnTimeMax :: pos_integer()
@ -117,6 +126,7 @@
-type srvState() :: #srvState{}.
-type cliState() :: #cliState{}.
-type reConnState() :: #reConnState{}.
-type recvState() :: #recvState{}.
-type poolName() :: atom().
-type poolNameOrSocket() :: atom() | socket().
@ -130,6 +140,7 @@
-type host() :: binary().
-type hostName() :: string().
-type poolSize() :: pos_integer().
-type agencySlg() :: bind | rand | poll.
-type backlogSize() :: pos_integer() | infinity.
-type messageId() :: pos_integer().
-type socket() :: inet:socket() | ssl:sslsocket().
@ -146,6 +157,8 @@
-type agencyCfg() ::
{reconnect, boolean()} |
{vstSize, pos_integer()} |
{agencySlg, agencySlg()} |
{backlogSize, backlogSize()} |
{reConnTimeMin, pos_integer()} |
{reConnTimeMax, pos_integer()}.
@ -154,61 +167,3 @@
-type dbOpts() :: #dbOpts{}.
-type agencyCfgs() :: [agencyCfg()].
-type agencyOpts() :: #agencyOpts{}.
%% http header
%% -type header() ::
%% 'Cache-Control' |
%% 'Connection' |
%% 'Date' |
%% 'Pragma'|
%% 'Transfer-Encoding' |
%% 'Upgrade' |
%% 'Via' |
%% 'Accept' |
%% 'Accept-Charset'|
%% 'Accept-Encoding' |
%% 'Accept-Language' |
%% 'Authorization' |
%% 'From' |
%% 'Host' |
%% 'If-Modified-Since' |
%% 'If-Match' |
%% 'If-None-Match' |
%% 'If-Range'|
%% 'If-Unmodified-Since' |
%% 'Max-Forwards' |
%% 'Proxy-Authorization' |
%% 'Range'|
%% 'Referer' |
%% 'User-Agent' |
%% 'Age' |
%% 'Location' |
%% 'Proxy-Authenticate'|
%% 'Public' |
%% 'Retry-After' |
%% 'Server' |
%% 'Vary' |
%% 'Warning'|
%% 'Www-Authenticate' |
%% 'Allow' |
%% 'Content-Base' |
%% 'Content-Encoding'|
%% 'Content-Language' |
%% 'Content-Length' |
%% 'Content-Location'|
%% 'Content-Md5' |
%% 'Content-Range' |
%% 'Content-Type' |
%% 'Etag'|
%% 'Expires' |
%% 'Last-Modified' |
%% 'Accept-Ranges' |
%% 'Set-Cookie'|
%% 'Set-Cookie2' |
%% 'X-Forwarded-For' |
%% 'Cookie' |
%% 'Keep-Alive' |
%% 'Proxy-Connection' |
%% binary() |
%% string().

+ 1
- 1
src/agApi/agMiscFuns.erl 查看文件

@ -138,7 +138,7 @@ getTransactions(PoolNameOrSocket) ->
% codeHTTP状态码
% timeUnix时间戳记为单位
curDbTime(PoolNameOrSocket) ->
agVstCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_admin/time">>, [], undefined).
agVstCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_admin/time">>, #{}, #{}, <<>>).
%
%

+ 31
- 34
src/agVstCli/agAgencyUtils.erl 查看文件

@ -8,49 +8,51 @@
cancelTimer/1
, dealClose/3
, reConnTimer/2
, agencyReply/2
, agencyReply/4
, agencyReTimeout/3
, initReConnState/3
, resetReConnState/1
, updateReConnState/1
]).
-spec dealClose(srvState(), cliState(), term()) -> {ok, srvState(), cliState()}.
dealClose(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, curInfo = CurInfo} = ClientState, Reply) ->
agencyReply(CurInfo, Reply),
agencyReplyAll(RequestsOuts, RequestsIns, Reply),
reConnTimer(SrvState, ClientState#cliState{requestsIns = [], requestsOuts = [], backlogNum = 0, revStatus = leisure, curInfo = undefined, recvState = undefined}).
dealClose(SrvState, ClientState, Reply) ->
agencyReplyAll(Reply),
reConnTimer(SrvState, ClientState#cliState{backlogNum = 0, revStatus = ?AgUndef}).
-spec reConnTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}.
reConnTimer(#srvState{reConnState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reConnTimer(#srvState{reConnState = ReconnectState} = SrvState, CliState) ->
#reConnState{current = Current} = MewReconnectState = agAgencyUtils:updateReConnState(ReconnectState),
TimerRef = erlang:send_after(Current, self(), ?AgMDoNetConn),
{ok, SrvState#srvState{reConnState = MewReconnectState, socket = undefined, timerRef = TimerRef}, CliState}.
-spec agencyReply(term(), term()) -> ok.
agencyReply({undefined, _RequestId, TimerRef}, _Reply) ->
agAgencyUtils:cancelTimer(TimerRef);
agencyReply({PidForm, RequestId, TimerRef}, Reply) ->
agAgencyUtils:cancelTimer(TimerRef),
catch PidForm ! #agReqRet{messageId = RequestId, reply = Reply},
ok;
agencyReply(undefined, _RequestRet) ->
ok.
reConnTimer(#srvState{reConnState = ReConnState} = SrvState, CliState) ->
#reConnState{current = Current} = MewReConnState = agAgencyUtils:updateReConnState(ReConnState),
TimerRef = erlang:send_after(Current, self(), ?AgMDoDBConn),
{ok, SrvState#srvState{reConnState = MewReConnState, socket = undefined, timerRef = TimerRef}, CliState}.
-spec agencyReply(undefined | pid(), messageId(), undefined | reference(), term()) -> ok.
agencyReply(undefined, MessageId, TimerRef, _Reply) ->
erlang:erase(MessageId),
agAgencyUtils:cancelTimer(TimerRef);
agencyReply(timeOut, MessageId, TimerRef, _Reply) ->
erlang:erase(MessageId),
agAgencyUtils:cancelTimer(TimerRef);
agencyReply(FormPid, RequestId, TimerRef, Reply) ->
agencyReply(FormPid, MessageId, TimerRef, Reply) ->
erlang:erase(MessageId),
agAgencyUtils:cancelTimer(TimerRef),
catch FormPid ! #agReqRet{messageId = RequestId, reply = Reply},
catch FormPid ! #agReqRet{messageId = MessageId, reply = Reply},
ok.
-spec agencyReplyAll(list(), list(), term()) -> ok.
agencyReplyAll(RequestsOuts, RequestsIns, Reply) ->
[agencyReply(FormPid, RequestId, undefined, Reply) || #agReq{messageId = RequestId, fromPid = FormPid} <- RequestsOuts],
[agencyReply(FormPid, RequestId, undefined, Reply) || #agReq{messageId = RequestId, fromPid = FormPid} <- lists:reverse(RequestsIns)],
-spec agencyReTimeout(undefined | pid(), messageId(), term()) -> ok.
agencyReTimeout(undefined, _MessageId, _Reply) ->
ok;
agencyReTimeout(timeOut, _MessageId, _Reply) ->
ok;
agencyReTimeout(FormPid, MessageId, Reply) ->
catch FormPid ! #agReqRet{messageId = MessageId, reply = Reply},
ok.
-spec agencyReplyAll(term()) -> ok.
agencyReplyAll(Reply) ->
[agencyReply(PidForm, MessageId, TimeRef, Reply) || {MessageId, {PidForm, TimeRef, _, _}} <- erlang:get()],
ok.
-spec cancelTimer(undefined | reference()) -> ok.
@ -80,16 +82,11 @@ initReConnState(IsReconnect, Min, Max) ->
end.
-spec resetReConnState(undefined | reConnState()) -> reConnState() | undefined.
resetReConnState(#reConnState{min = Min} = ReconnectState) ->
ReconnectState#reConnState{current = Min}.
resetReConnState(#reConnState{min = Min} = ReConnState) ->
ReConnState#reConnState{current = Min}.
-spec updateReConnState(reConnState()) -> reConnState().
updateReConnState(#reConnState{current = Current, max = Max} = ReconnectState) ->
updateReConnState(#reConnState{current = Current, max = Max} = ReConnState) ->
NewCurrent = Current + Current,
ReconnectState#reConnState{current = minCur(NewCurrent, Max)}.
minCur(A, B) when B >= A ->
A;
minCur(_, B) ->
B.
ReConnState#reConnState{current = if NewCurrent >= Max -> Max; true -> NewCurrent end}.

+ 4
- 3
src/agVstCli/agMiscUtils.erl 查看文件

@ -55,16 +55,17 @@ dbOpts(DbCfgs) ->
PoolSize = ?AgGetListKV(poolSize, DbCfgs, ?AgDefPoolSize),
SocketOpts = ?AgGetListKV(socketOpts, DbCfgs, ?AgDefSocketOpts),
DbOpts = agMiscUtils:parseUrl(BaseUrl),
UserPasswordBase64 = {<<"Authorization">>, <<"Basic ", (base64:encode(<<User/binary, ":", Password/binary>>))/binary>>},
DbOpts#dbOpts{dbName = <<"/_db/", DbName/binary>>, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
DbOpts#dbOpts{dbName = <<"/_db/", DbName/binary>>, user = User, password = Password, poolSize = PoolSize, socketOpts = SocketOpts}.
-spec agencyOpts(list()) -> agencyOpts().
agencyOpts(AgencyCfgs) ->
IsReconnect = ?AgGetListKV(reconnect, AgencyCfgs, ?AgDefIsReConn),
VstSize = ?AgGetListKV(vstSize, AgencyCfgs, ?AgDefVstSize),
AgencySlg = ?AgGetListKV(agencySlg, AgencyCfgs, ?AgDefAgencySlg),
BacklogSize = ?AgGetListKV(backlogSize, AgencyCfgs, ?AgDefBacklogSize),
Min = ?AgGetListKV(reConnTimeMin, AgencyCfgs, ?AgDefReConnMin),
Max = ?AgGetListKV(reConnTimeMax, AgencyCfgs, ?AgDefReConnMax),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}.
#agencyOpts{reconnect = IsReconnect, vstSize = VstSize, agencySlg = AgencySlg, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}.
-spec getListValue(term(), list(), term()) -> term().
getListValue(Key, List, Default) ->

src/agVstCli/agSslAgencyExm.erl → src/agVstCli/agSslAgencyExm.erl1 查看文件


src/agVstCli/agSslAgencyIns.erl → src/agVstCli/agSslAgencyIns.erl1 查看文件

@ -15,7 +15,7 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
self() ! ?AgMDoNetConn,
self() ! ?AgMDoDBConn,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reConnState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
@ -97,7 +97,7 @@ handleMsg({timeout, TimerRef, mWaitingOver},
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% 之前的数据超时之后 要关闭ssl 然后重新建立连接 以免后面该ssl收到该次超时数据 影响后面请求的接收数据 导致数据错乱
ssl:close(Socket),
handleMsg(?AgMDoNetConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg(?AgMDoDBConn, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({ssl_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
@ -111,7 +111,7 @@ handleMsg({ssl_error, Socket, Reason},
?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
ssl:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {ssl_error, Reason}});
handleMsg(?AgMDoNetConn,
handleMsg(?AgMDoDBConn,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of

+ 33
- 12
src/agVstCli/agTcpAgencyIns.erl 查看文件

@ -14,7 +14,7 @@
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reConnTimeMin = Min, reConnTimeMax = Max}}) ->
self() ! ?AgMDoNetConn,
self() ! ?AgMDoDBConn,
ReConnState = agAgencyUtils:initReConnState(Reconnect, Min, Max),
{ok, #srvState{poolName = PoolName, serverName = AgencyName, reConnState = ReConnState}, #cliState{backlogSize = BacklogSize}}.
@ -66,7 +66,10 @@ handleMsg({tcp, _Socket, Data}, SrvState,
end;
handleMsg({timeout, _TimerRef, {mWaitingOver, MessageId, FromPid}}, SrvState,
#cliState{backlogNum = BacklogNum} = CliState) ->
agAgencyUtils:agencyReply(FromPid, MessageId, undefined, {error, timeout}),
MsgCache = erlang:get(MessageId),
MsgPF = erlang:setelement(?AgPFIdx, MsgCache, timeOut),
erlang:put(MessageId, MsgPF),
agAgencyUtils:agencyReTimeout(FromPid, MessageId, {error, timeout}),
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
@ -80,15 +83,25 @@ handleMsg({tcp_error, Socket, Reason},
?AgWarn(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?AgMDoNetConn,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = ReconnectState} = SrvState,
handleMsg(?AgMDoDBConn,
#srvState{poolName = PoolName, serverName = ServerName, reConnState = _ReConnState} = SrvState,
CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
#dbOpts{port = Port, hostname = HostName, dbName = DbName, user = User, password = Password, socketOpts = SocketOpts} ->
case gen_tcp:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
%% IMY-todo
{ok, Socket};
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
gen_tcp:send(Socket, AuthInfo),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),
?AgWarn(auth, "connect and auth success: ~p~n", [Term]),
{ok, SrvState#srvState{dbName = DbName, socket = Socket}, CliState};
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
agAgencyUtils:reConnTimer(SrvState, CliState)
end;
{error, Reason} ->
?AgWarn(ServerName, "connect error: ~p~n", [Reason]),
agAgencyUtils:reConnTimer(SrvState, CliState)
@ -102,12 +115,20 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason, #srvState{socket = Socket} = SrvState, CliState) ->
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
{ok, NewSrvState, NewCliState} = waitAllReqOver(SrvState, CliState),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->
KVList = erlang:get(),
ok.
-spec waitAllReqOver(srvState(), cliState()) -> {ok, srvState(), cliState()}.
waitAllReqOver(SrvState, #cliState{backlogNum = BacklogNum} = CliState) ->
case BacklogNum > 0 of
true ->
receive
Msg ->
{ok, NewSrvState, NewCliState} = handleMsg(Msg, SrvState, CliState),
waitAllReqOver(NewSrvState, NewCliState)
end;
_ ->
{ok, SrvState, CliState}
end.

+ 106
- 99
src/agVstCli/agVstCli.erl 查看文件

@ -7,14 +7,14 @@
-export([
%% Common Request API
callAgency/5
, callAgency/6
callAgency/6
, callAgency/7
, castAgency/5
, callAgency/8
, castAgency/6
, castAgency/7
, castAgency/8
, receiveRequestRet/2
, castAgency/9
, receiveReqRet/2
%% Pools API
, startPool/2
@ -22,45 +22,47 @@
, stopPool/1
%% Single Process DbAPI
, connectDb/1
, disConnectDb/1
, connDb/1
, disConnDb/1
, getCurDbInfo/1
, useDatabase/2
,receiveTcpData/2
,receiveSslData/2
]).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> term() | {error, term()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, false, ?AgDefTimeout).
-spec callAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body()) -> term() | {error, term()}.
callAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body) ->
callAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, false, ?AgDefTimeout).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, ?AgDefTimeout).
-spec callAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body(), boolean()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, IsSystem) ->
callAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, IsSystem, ?AgDefTimeout).
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
case castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout) of
-spec callAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, IsSystem, Timeout) ->
case castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, self(), IsSystem, Timeout) of
{waitRRT, RequestId, MonitorRef} ->
receiveRequestRet(RequestId, MonitorRef);
receiveReqRet(RequestId, MonitorRef);
{error, _Reason} = Err ->
Err;
Ret ->
Ret
end.
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?AgDefTimeout).
-spec castAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body) ->
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, self(), false, ?AgDefTimeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?AgDefTimeout).
-spec castAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body(), boolean()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, IsSystem) ->
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, self(), IsSystem, ?AgDefTimeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout).
-spec castAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, IsSystem, Timeout) ->
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, self(), IsSystem, Timeout).
-spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
-spec castAgency(poolNameOrSocket(), method(), path(), queryPars(), headers(), body(), pid(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSystem, Timeout) ->
OverTime =
case Timeout of
@ -89,19 +91,19 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
ok ->
receiveTcpData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
receiveTcpData(#recvState{}, PoolNameOrSocket);
{error, Reason} = Err ->
?AgWarn(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket),
disConnDb(PoolNameOrSocket),
Err
end;
ssl ->
case ssl:send(PoolNameOrSocket, Request) of
ok ->
receiveSslData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
receiveSslData(#recvState{}, PoolNameOrSocket);
{error, Reason} = Err ->
?AgWarn(castAgency, ":ssl send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket),
disConnDb(PoolNameOrSocket),
Err
end
end;
@ -110,8 +112,8 @@ castAgency(PoolNameOrSocket, Method, Path, QueryPars, Headers, Body, Pid, IsSyst
end
end.
-spec receiveRequestRet(messageId(), reference()) -> {StatusCode :: non_neg_integer(), Body :: binary(), Headers :: binary()} | {error, term()}.
receiveRequestRet(RequestId, MonitorRef) ->
-spec receiveReqRet(messageId(), reference()) -> {StatusCode :: non_neg_integer(), Body :: binary(), Headers :: binary()} | {error, term()}.
receiveReqRet(RequestId, MonitorRef) ->
receive
#agReqRet{messageId = RequestId, reply = Reply} ->
erlang:demonitor(MonitorRef),
@ -130,62 +132,48 @@ receiveRequestRet(RequestId, MonitorRef) ->
{error, {agencyDown, Reason}}
end.
-spec receiveTcpData(recvState() | undefined, socket(), binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
-spec receiveTcpData(recvState() | undefined, socket()) -> {ok, term(), term()} | {error, term()}.
receiveTcpData(RecvState, Socket) ->
receive
{tcp, Socket, Data} ->
case agVstProtoSp:response(?AgUndef, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
case Body of
<<>> ->
{ok, #{}, StatusCode, Headers};
_ ->
{ok, jiffy:decode(Body, [return_maps, copy_strings]), StatusCode, Headers}
end;
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
{error, Reason} ->
?AgWarn(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {tcpDataError, Reason}}
{tcp, Socket, DataBuffer} ->
?AgWarn(1111, "receove : ~p~n", [DataBuffer]),
case agVstProtoSp:response(element(1, RecvState), RecvState, DataBuffer) of
{?AgMDone, MsgBin} ->
{ok, MsgBin};
{?AgCHeader, NewRecvState} ->
receiveTcpData(NewRecvState, Socket);
{?AgCBodyStart, NewRecvState} ->
receiveTcpData(NewRecvState, Socket);
{?AgCBodyGoOn, NewRecvState} ->
receiveTcpData(NewRecvState, Socket)
end;
{tcp_closed, Socket} ->
disConnectDb(Socket),
disConnDb(Socket),
{error, tcp_closed};
{tcp_error, Socket, Reason} ->
disConnectDb(Socket),
disConnDb(Socket),
{error, {tcp_error, Reason}}
end.
-spec receiveSslData(recvState() | undefined, socket(), binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
-spec receiveSslData(recvState() | undefined, socket()) -> {ok, term(), term()} | {error, term()}.
receiveSslData(RecvState, Socket) ->
receive
{ssl, Socket, Data} ->
try agVstProtoPl:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
case Body of
<<>> ->
{ok, #{}, StatusCode, Headers};
_ ->
{ok, jiffy:decode(Body, [return_maps, copy_strings]), StatusCode, Headers}
end;
{ok, NewRecvState} ->
receiveSslData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
{error, Reason} ->
?AgWarn(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
{error, {sslDataError, Reason}}
catch
E:R:S ->
?AgWarn(receiveSslData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
disConnectDb(Socket),
{error, handledataError}
{ssl, Socket, DataBuffer} ->
case agVstProtoSp:response(element(1, RecvState), RecvState, DataBuffer) of
{?AgMDone, MsgBin} ->
{ok, MsgBin};
{?AgCHeader, NewRecvState} ->
receiveTcpData(NewRecvState, Socket);
{?AgCBodyStart, NewRecvState} ->
receiveTcpData(NewRecvState, Socket);
{?AgCBodyGoOn, NewRecvState} ->
receiveTcpData(NewRecvState, Socket)
end;
{ssl_closed, Socket} ->
disConnectDb(Socket),
disConnDb(Socket),
{error, ssl_closed};
{ssl_error, Socket, Reason} ->
disConnectDb(Socket),
disConnDb(Socket),
{error, {ssl_error, Reason}}
end.
@ -201,47 +189,66 @@ startPool(PoolName, DbCfgs, AgencyCfgs) ->
stopPool(PoolName) ->
agAgencyPoolMgrIns:stopPool(PoolName).
-spec connectDb(dbCfgs()) -> {ok, socket()} | {error, term()}.
connectDb(DbCfgs) ->
-spec connDb(dbCfgs()) -> {ok, socket()} | {error, term()}.
connDb(DbCfgs) ->
#dbOpts{
host = Host,
port = Port,
hostname = HostName,
dbName = DbName,
protocol = Protocol,
userPassword = UserPassword,
user = User,
password = Password,
socketOpts = SocketOpts
} = agMiscUtils:dbOpts(DbCfgs),
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case Protocol of
tcp ->
case gen_tcp:connect(Ip, Port, SocketOpts, ?AgDefConnTimeout) of
case gen_tcp:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
{ok, Socket};
gen_tcp:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
gen_tcp:send(Socket, AuthInfo),
inet:getopts(Socket, [active]),
AA = inet:getopts(Socket, [active]),
?AgWarn(auth, "connect opt: ~p~n", [AA]),
case agVstCli:receiveTcpData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),
?AgWarn(auth, "connect and auth success: ~p~n", [Term]),
setCurDbInfo(Socket, DbName, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
end;
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
end;
ssl ->
case ssl:connect(Ip, Port, SocketOpts, ?AgDefConnTimeout) of
case ssl:connect(HostName, Port, SocketOpts, ?AgDefConnTimeout) of
{ok, Socket} ->
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
{ok, Socket};
ssl:send(Socket, ?AgUpgradeInfo),
AuthInfo = eVPack:encode([1, 1000, <<"plain">>, User, Password]),
ssl:send(Socket, AuthInfo),
case agVstCli:receiveSslData(#recvState{}, Socket) of
{ok, MsgBin} ->
Term = eVPack:decode(MsgBin),
?AgWarn(auth, "connect and auth success: ~p~n", [Term]),
setCurDbInfo(Socket, DbName, Protocol),
{ok, Socket};
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
end;
{error, Reason} = Err ->
?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
Err
end
end;
{error, Reason} = Err ->
?AgWarn(connectDb, "getaddrs error: ~p~n", [Reason]),
Err
end.
-spec disConnectDb(socket()) -> ok | {error, term()}.
disConnectDb(Socket) ->
-spec disConnDb(socket()) -> ok | {error, term()}.
disConnDb(Socket) ->
case erlang:erase({'$agDbInfo', Socket}) of
undefined ->
ignore;
@ -254,9 +261,9 @@ disConnectDb(Socket) ->
end
end.
-spec setCurDbInfo(socket(), binary(), tuple(), host(), protocol()) -> term().
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, UserPassword, Host, Protocol}).
-spec setCurDbInfo(socket(), binary(), protocol()) -> term().
setCurDbInfo(Socket, DbName, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, Protocol}).
-spec getCurDbInfo(socket()) -> term().
getCurDbInfo(Socket) ->
@ -267,7 +274,7 @@ useDatabase(Socket, NewDbName) ->
case erlang:get({'$agDbInfo', Socket}) of
undefined ->
ignore;
{_DbName, UserPassword, Host, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {<<"/_db/", NewDbName/binary>>, UserPassword, Host, Protocol})
{_DbName, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {<<"/_db/", NewDbName/binary>>, Protocol})
end,
ok.

+ 14
- 21
src/agVstCli/agVstProtoPl.erl 查看文件

@ -15,11 +15,11 @@ request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
-spec response(AgStatus :: pos_integer(), DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
{?AgUndef, DoneCnt :: pos_integer()} |
{?AgCHeader, DoneCnt :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyStart, DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary}.
{?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary()}.
response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
@ -57,7 +57,7 @@ response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Dat
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
{?AgCBodyStart, DoneCnt, MessageId, ChunkX, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
@ -68,7 +68,7 @@ response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Dat
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, ChunkBin),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
@ -126,7 +126,7 @@ response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Da
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
{?AgCBodyStart, DoneCnt, MessageId, ChunkX, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
@ -137,7 +137,7 @@ response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Da
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, ChunkBin),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
@ -167,30 +167,23 @@ response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuf
ChunkIdx >= ChunkCnt ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, NewCkBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
ChunkIdx < ChunkCnt ->
true ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt};
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
{?AgUndef, DoneCnt}
end;
ByteSize < ChunkSize ->
{?AgCBodyGoOn, DoneCnt, NewCkBuffer};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = NewCkBuffer,
if
ChunkIdx >= ChunkCnt ->
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LastChunkBin/binary>>),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, LeftBin);
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LastChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, LeftBin);
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, ChunkBin/binary>>),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
end.

+ 26
- 121
src/agVstCli/agVstProtoSp.erl 查看文件

@ -6,7 +6,7 @@
-export([
request/7
, response/7
, response/3
]).
-spec request(boolean(), method(), binary(), path(), queryPars(), headers(), body()) -> iolist().
@ -15,12 +15,12 @@ request(false, Method, DbName, Path, QueryPars, Headers, Body) ->
request(_, Method, _DbName, Path, QueryPars, Headers, Body) ->
[eVPack:encode([1, 1, <<"/_db/_system">>, Method, Path, QueryPars, Headers]), Body].
-spec response(AgStatus :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary(), Data :: binary()) ->
{?AgUndef, DoneCnt :: pos_integer()} |
{?AgCHeader, DoneCnt :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyStart, DoneCnt :: pos_integer(), MessageId :: pos_integer(), ChunkIdx :: pos_integer(), ChunkSize :: pos_integer(), ChunkBuffer :: binary()} |
{?AgCBodyGoOn, DoneCnt :: pos_integer(), ChunkBuffer :: binary}.
response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, DataBuffer) ->
-spec response(AgStatus :: pos_integer(), RecvState :: recvState(), DataBuffer :: binary()) ->
{?AgMDone, MsgBin :: binary()} |
{?AgCHeader, RecvState :: recvState()} |
{?AgCBodyStart, RecvState :: recvState()} |
{?AgCBodyGoOn, RecvState :: recvState()}.
response(?AgUndef, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer} = RecvState, DataBuffer) ->
case DataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
ByteSize = erlang:byte_size(LeftBuffer),
@ -30,65 +30,29 @@ response(?AgUndef, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, _ChunkBuffer, Dat
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
{?AgMDone, LeftBuffer};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt}
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <<MsgBuffer/binary, LeftBuffer/binary>>}}
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}
end
end;
_ ->
{?AgCHeader, DoneCnt, DataBuffer}
{?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = DataBuffer}}
end;
response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, DataBuffer) ->
response(?AgCHeader, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) ->
NewDataBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
case NewDataBuffer of
<<Length:32/integer-little-unsigned, ChunkX:31/integer-little-unsigned, IsFirst:1/integer-little-unsigned, MessageId:64/integer-little-unsigned, _MessageLength:64/integer-little-unsigned, LeftBuffer/binary>> ->
@ -99,98 +63,39 @@ response(?AgCHeader, DoneCnt, _MessageId, _ChunkIdx, _ChunkSize, ChunkBuffer, Da
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, LeftBuffer),
{?AgUndef, DoneCnt + 1};
{?AgMDone, LeftBuffer};
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgUndef, DoneCnt};
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, chunkCnt = ChunkX, msgBuffer = LeftBuffer}};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LeftBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
{?AgMDone, <<MsgBuffer/binary, LeftBuffer/binary>>};
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LeftBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt}
{?AgCDone, #recvState{revStatus = ?AgUndef, messageId = MessageId, msgBuffer = <<MsgBuffer/binary, LeftBuffer/binary>>}}
end
end;
ByteSize < ChunkSize ->
if
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgCC = erlang:setelement(?AgCCIdx, MsgCache, ChunkX),
erlang:put(MessageId, MsgCC),
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer};
true ->
{?AgCBodyStart, DoneCnt, MessageId, 1, ChunkSize, LeftBuffer}
end;
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<ChunkBin:ChunkSize/binary, NextBuffer/binary>> = LeftBuffer,
if
IsFirst == ChunkX ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
IsFirst == 1 ->
MsgCache = erlang:get(MessageId),
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, LeftBuffer),
MsgCC = erlang:setelement(?AgCCIdx, MsgMB, ChunkX),
erlang:put(MessageId, MsgCC),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer);
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, messageId = MessageId, chunkCnt = ChunkX, chunkIdx = 1, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
case ChunkX >= ChunkCnt of
true ->
agAgencyUtils:agencyReply(MessageId, ChunkBin),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, NextBuffer);
_ ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, ChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, NextBuffer)
end
{?AgCBodyStart, RecvState#recvState{revStatus = ?AgCBody, chunkIdx = ChunkX, chunkSize = ChunkSize, chunkBuffer = LeftBuffer}}
end
end;
_ ->
{?AgCHeader, DoneCnt, NewDataBuffer}
{?AgCHeader, RecvState#recvState{revStatus = ?AgCHeader, chunkBuffer = NewDataBuffer}}
end;
response(?AgCBody, DoneCnt, MessageId, ChunkIdx, ChunkSize, ChunkBuffer, DataBuffer) ->
response(?AgCBody, #recvState{chunkCnt = ChunkCnt, msgBuffer = MsgBuffer, chunkIdx = ChunkIdx, chunkSize = ChunkSize, chunkBuffer = ChunkBuffer} = RecvState, DataBuffer) ->
NewCkBuffer = <<ChunkBuffer/binary, DataBuffer/binary>>,
ByteSize = erlang:byte_size(NewCkBuffer),
if
ChunkSize == ByteSize ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
if
ChunkIdx >= ChunkCnt ->
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, NewCkBuffer/binary>>),
{?AgUndef, DoneCnt + 1};
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, NewCkBuffer/binary>>),
erlang:put(MessageId, MsgMB),
{?AgUndef, DoneCnt};
{?AgMDone, <<MsgBuffer/binary, NewCkBuffer/binary>>};
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
{?AgCDone, RecvState#recvState{revStatus = ?AgUndef, msgBuffer = <<MsgBuffer/binary, NewCkBuffer/binary>>}}
end;
ByteSize < ChunkSize ->
{?AgCBodyGoOn, DoneCnt, NewCkBuffer};
true ->
{_PidFrom, _TimerRef, ChunkCnt, MsgBuffer} = MsgCache = erlang:get(MessageId),
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
if
ChunkIdx >= ChunkCnt ->
<<LastChunkBin:ChunkSize/binary, LeftBin/binary>> = NewCkBuffer,
agAgencyUtils:agencyReply(MessageId, <<MsgBuffer/binary, LastChunkBin/binary>>),
response(?AgUndef, DoneCnt + 1, 0, 0, 0, <<>>, LeftBin);
ChunkIdx < ChunkCnt ->
MsgMB = erlang:setelement(?AgMBIdx, MsgCache, <<MsgBuffer/binary, LastChunkBin/binary>>),
erlang:put(MessageId, MsgMB),
response(?AgUndef, DoneCnt, 0, 0, 0, <<>>, LeftBin);
true ->
?AgWarn(agVstProtocol_response_body, "there is not should come 11 ~p ~p ~n", [ByteSize, ChunkSize]),
{error, error_bad_chunkIdx}
end
{?AgCBodyGoOn, RecvState#recvState{chunkBuffer = NewCkBuffer}}
end.

src/agVstCli/vst.erl → src/agVstCli/vst.erl1 查看文件


+ 2
- 2
src/eArango.app.src 查看文件

@ -1,9 +1,9 @@
{application, erlArango,
{application, eArango,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{registered, []},
{mod, {eArango_app, []}},
{applications, [kernel, stdlib, jiffy]},
{applications, [kernel, stdlib]},
{env, []},
{modules, []},
{licenses, ["MIT License"]},

+ 3
- 3
src/user_default.erl 查看文件

@ -5,11 +5,11 @@
start() ->
erlSync:run(),
application:ensure_all_started(erlArango),
application:ensure_all_started(eArango),
agVstCli:startPool(tt, [{poolSize, 10}], []).
tt(C, N) ->
application:ensure_all_started(erlArango),
application:ensure_all_started(eArango),
agVstCli:startPool(tt, [{poolSize, 16}], []),
StartTime = erlang:system_time(millisecond),
io:format("IMY********************** started~n"),
@ -26,7 +26,7 @@ test(N, StartTime) ->
test(N - 1, StartTime).
%% tt(C, N) ->
%% application:start(erlArango),
%% application:start(eArango),
%% agHttpCli:startPool(tt, [{poolSize, 1}, {baseUrl, <<"http://localhost:8181">>}], []),
%% Request = {<<"GET">>, <<"/_api/database/current">>, [], []},
%% io:format("IMY********************** start time ~p~n",[erlang:system_time(millisecond)]),

正在加载...
取消
保存