@ -0,0 +1,286 @@ | |||
Client / Server Communication (VST 1.1) | |||
======================================= | |||
Version 1.1.0 of 23 November 2016 | |||
HTTP | |||
---- | |||
Use VelocyPack as body. Content-Type is `"application/vpack"` | |||
Binary Protocol | |||
--------------- | |||
This is not a request / response protocol. It is symmetric (in | |||
principle). Messages can be sent back and forth, pipelined, multiplexed, | |||
uni-directional or bi-directional. | |||
It is possible that a message generates | |||
- no response | |||
- exactly one response | |||
- multiple responses | |||
The VelocyStream does **not** impose or specify or require one of the | |||
above behaviors. The application must define a behavior in general or on | |||
a per-request basis, see below. The server and client must then | |||
implement that behavior. | |||
### Message vs. Chunk | |||
The consumer (client or server) will deal with messages. A message | |||
consists of one or more VelocyPacks (or in some cases of certain parts | |||
of binary data). How many VelocyPacks are part of a message is | |||
completely application dependent, see below for ArangoDB. | |||
It is possible that the messages are very large. As messages can be | |||
multiplexed over one connection, large messages need to be split into | |||
chunks. The sender/receiver class will accept a vector of VelocyPacks, | |||
split them into chunks, send these chunks over the wire, assemble these | |||
chunks, generates a vector of VelocyPacks and passes this to the | |||
consumer. | |||
### Chunks | |||
In order to allow reassemble chunks, each package is prefixed by a small | |||
header. A chunk is always at least 24 bytes long. The byte order is | |||
ALWAYS little endian. The format of a chunk is the following, regardless | |||
on whether it is the first in a message or a subsequent one: | |||
| name | type | description | | |||
| --------------- | ------------------------- | --- | | |||
| length | uint32\_t | total length in bytes of the current chunk, including this header | | |||
| chunkX | uint32\_t | chunk/isFirstChunk (upper 31bits/lowest bit), details see below | | |||
| messageId | uint64\_t | a unique identifier, it is the responsibility of the sender to generate such an identifier (zero is reserved for not set ID) | | |||
| messageLength | uint64\_t | the total size of the message. | | |||
| Binary data | binary data blob | size b1 | | |||
Clarification: "chunk" and "isFirstChunk" are combined into an unsigned | |||
32bit value. Therefore it will be encoded as | |||
uint32_t chunkX | |||
and extracted as | |||
chunk = chunkX >> 1 | |||
isFirstChunk = chunkX & 0x1 | |||
For the first chunk of a message, the low bit of the second uint32\_t is | |||
set, for all subsequent ones it is reset. In the first chunk of a | |||
message, the number "chunk" is the total number of chunks in the | |||
message, in all subsequent chunks, the number "chunk" is the current | |||
number of this chunk. | |||
The total size of the data package is (24 + b1) bytes. This number is | |||
stored in the length field. If one needs to send messages larger than | |||
UINT32\_MAX, then these messages must be chunked. In general it is a | |||
good idea to restrict the maximal size to a few megabytes. | |||
**Notes:** | |||
When sending a (small) message, it is important (for performance reasons) | |||
to ensure that only one TCP | |||
packet is sent. For example, by using sendmmsg under Linux | |||
([*https://blog.cloudflare.com/how-to-receive-a-million-packets/*](https://blog.cloudflare.com/how-to-receive-a-million-packets/)) | |||
Implementors should nevertheless be aware that in TCP/IP one cannot | |||
enforce this and so implementations must always be aware that some part | |||
of the network stack can split packets and the payload might arrive in | |||
multiple parts! | |||
ArangoDB | |||
======== | |||
### Request / Response | |||
For an ArangoDB client, the request is of the following format, the | |||
array is a VelocyPack array: | |||
[ | |||
/* 0 - version: */ 1, // [int] | |||
/* 1 - type: */ 1, // [int] 1=Req, 2=Res,.. | |||
/* 2 - database: */ "test", // [string] | |||
/* 3 - requestType: */ 1, // [int] 0=Delete, ... | |||
/* 4 - request: */ "/_api/collection", // [string\] | |||
/* 5 - parameter: */ { force: true }, // [[string]->[string]] | |||
/* 6 - meta: */ { x-arangodb: true } // [[string]->[string]] | |||
] | |||
Body (binary data) | |||
If database is missing (entry is `null`), then "\_system" is assumed. | |||
`type`: | |||
1 = Request | |||
2 = Response (final response for this message id) | |||
3 = Response (but at least one more response will follow) | |||
1000 = Authentication | |||
`requestType`: | |||
0 = DELETE | |||
1 = GET | |||
2 = POST | |||
3 = PUT | |||
4 = HEAD (not used in VPP) | |||
5 = PATCH | |||
6 = OPTIONS (not used in VPP) | |||
For example: | |||
The HTTP request | |||
http://localhost:8529/_db/test/_admin/echo?a=1&b=2&c[]=1&c[]=3 | |||
With header: | |||
X-ArangoDB-Async: true | |||
is equivalent to | |||
[ | |||
1, // version | |||
1, // type | |||
"test", // database | |||
1, // requestType GET | |||
"/_admin/echo", // request path | |||
{ // parameters | |||
a: 1, | |||
b: 2, | |||
c: [ 1, 3 ] | |||
}, | |||
{ // meta | |||
x-arangodb-async: true | |||
} | |||
] | |||
The request is a message beginning with one VelocyPack. This VelocyPack | |||
always contains the header fields, parameters and request path. If the | |||
meta field does not contain a content type, then the default | |||
`"application/vpack"` is assumed and the body will be one or multiple | |||
VelocyPack object. | |||
The response will be | |||
[ | |||
1, // 0 - version | |||
2 or 3, // 1 - type | |||
400, // 2 - responseCode | |||
{ etag: "1234" } // 3 - meta: [[str]->[str]] | |||
] | |||
Body (binary data) | |||
Request can be pipelined or mixed. The responses are mapped using the | |||
"messageId" in the header. It is the responsibility of the **sender** to | |||
generate suitable "messageId" values. | |||
The default content-type is `"application/vpack"`. | |||
### Authentication | |||
A connection can be authenticated with the following message: | |||
[ | |||
1, // version | |||
1000, // type | |||
"plain", // encryption | |||
"admin", // user | |||
"plaintext", // password | |||
] | |||
or | |||
[ | |||
1, // version | |||
1000, // type | |||
"jwt", // encryption | |||
"abcd..." // token | |||
] | |||
The response is | |||
{ "error": false } | |||
if successful or | |||
{ | |||
"error": true, | |||
"errorMessage": "MESSAGE", | |||
"errorCode": CODE | |||
} | |||
if not successful, and in this case the connection is closed by the server. | |||
One can acquire a JWT token in the same way as with HTTP using the | |||
open, unauthenticated route `/_open/auth` with the same semantics as | |||
in the HTTP version. In this way, the complete authentication can be | |||
done in a single session via JWT. | |||
### Content-Type and Accept | |||
In general the content-type will be VPP, that is the body is an object | |||
stored as VelocyPack. | |||
Sometimes it is necessary to respond with unstructured data, like text, | |||
css or html. The body will be a VelocyPack object containing just a | |||
binary attribute and the content-type will be set accordingly. | |||
The rules are as follows. | |||
#### Http | |||
Request: Content-Type | |||
- `"application/json"`: the body contains the JSON string representation | |||
- `"application/vpack"`: the body contains a velocy pack | |||
There are some handler that allow lists of JSON (seperared by newline). | |||
In this case we also allow multiple velocy packs without any separator. | |||
Request: Accept | |||
- `"application/json"`: send a JSON string representation in the body, | |||
if possible | |||
- `"application/vpack"`: send velocy pack in the body, if possible | |||
If the request asked for `"application/json"` or `"application/vpack"` and | |||
the handler produces something else (i.e. `"application/html"`), then the | |||
accept is ignored. | |||
If the request asked `"application/json"` and the handler produces | |||
`"application/vpack"`, then the VPACK is converted into JSON. | |||
If the request asked `"application/vpack"` and the handler produces | |||
"application/json", then the JSON is converted into VPACK. | |||
#### VPP | |||
Similar to HTTP with the exception: the "Accept" header is not supported | |||
and `"application/json"` will always be converted into | |||
"application/vpack". This means that the body contains one or more | |||
velocy-packs. In general it will contain one - notable exception being | |||
the import. | |||
If the handler produces something else (i.e. `"application/html"`), then | |||
The body will be a binary blob (instead of a velocy-pack) and the | |||
content-type will be set accordingly. | |||
The first bytes sent after a connection (the "client" side - even if the | |||
program is bi-directional, there is a server listening to a port and a | |||
client connecting to a port) are | |||
VST/1.1\r\n\r\n | |||
(11 Bytes) | |||
@ -0,0 +1,188 @@ | |||
Client / Server Communication (VST 1.1) | |||
======================================= | |||
Version 1.1.0 of 23 November 2016 | |||
# HTTP | |||
使用VelocyPack作为身体。内容类型为"application/vpack" | |||
Binary Protocol | |||
--------------- | |||
这不是请求/响应协议。它是对称的(原则上)。消息可以来回发送,流水线,多路复用,单向或双向。 | |||
可能会生成一条消息 | |||
没有反应 | |||
只是一个回应 | |||
多重回应 | |||
该VelocyStream并没有强加或指定或要求上述行为之一。 | |||
应用程序必须在一般情况下或根据每个请求定义行为, | |||
请参见下文。然后,服务器和客户端必须实现该行为。 | |||
### Message vs. Chunk | |||
使用者(客户端或服务器)将处理消息。一条消息包含一个或多个VelocyPack(在某些情况下是二进制数据的某些部分)。 | |||
消息中有多少VelocyPacks完全取决于应用程序,请参阅下文了解ArangoDB。 | |||
消息可能很大。由于可以通过一个连接对消息进行多路复用,因此需要将大消息拆分为多个块。 | |||
发送者/接收者类将接受一个VelocyPacks向量,将其分割成块,通过导线发送这些块,组装这些块, | |||
生成一个VelocyPacks向量并将其传递给消费者。 | |||
### Chunks | |||
为了允许重组块,每个程序包都以一个 header 作为前缀。块始终至少为24个字节长。字节顺序始终是小端。 | |||
块的格式如下,无论它是消息中的第一个消息还是随后的消息: | |||
名称 类型 描述 | |||
length uint32_t 当前块(包括此标头)的总长度(以字节为单位) | |||
chunkX uint32_t chunk / isFirstChunk(高31位/最低位),详细信息请参见下文 | |||
messageId uint64_t 唯一标识符,发送方有责任生成这样的标识符(zero is reserved for not set ID) | |||
messageLength uint64_t the total size of the message. | |||
Binary data binary data blob size b1 | | |||
声明:"chunk" and "isFirstChunk" are combined into an unsigned 32bit value. | |||
Therefore it will be encoded as | |||
uint32_t chunkX and extracted as | |||
chunk = chunkX >> 1 | |||
isFirstChunk = chunkX & 0x1 | |||
对于消息的第一块,设置第二个uint32_t的低位,对于所有后续消息,将其复位。 | |||
在消息的第一个块中,数字 chunk 是消息中的块总数,在所有后续块中,数字 chunk 是该块的当前number。 | |||
数据包的总大小为(24 + b1)字节。此数字存储在length字段中。 | |||
如果需要发送大于UINT32_MAX的消息,则必须将这些消息分块。通常,最好将最大大小限制为几兆字节。 | |||
### **Notes:** | |||
发送(小)消息时,(出于性能原因)确保仅发送一个TCP数据包非常重要。 | |||
例如,通过在Linux下使用sendmmsg | |||
([https://blog.cloudflare.com/how-to-receive-a-million-packets](https://blog.cloudflare.com/how-to-receive-a-million-packets/)) | |||
但是,实现者应该意识到,在TCP / IP中不能强制执行此操作,因此实现者必须始终意识到, | |||
网络堆栈的某些部分可以拆分数据包,并且有效负载可能分成多个部分! | |||
ArangoDB | |||
======== | |||
### Request / Response | |||
对于ArangoDB客户端,请求的格式如下,该数组是VelocyPack数组: | |||
[ | |||
/* 0 - version: */ 1, // [int] | |||
/* 1 - type: */ 1, // [int] 1=Req, 2=Res,.. | |||
/* 2 - database: */ "test", // [string] | |||
/* 3 - requestType: */ 1, // [int] 0=Delete, ... | |||
/* 4 - request: */ "/_api/collection", // [string\] | |||
/* 5 - parameter: */ { force: true }, // [[string]->[string]] http的请求参数列表 | |||
/* 6 - meta: */ { x-arangodb: true } // [[string]->[string]] http的header | |||
] | |||
Body (binary data) | |||
如果数据库未设置(entry is `null`),则数据库为“ _system”。 | |||
#### type: | |||
1 = Request | |||
2 = Response (final response for this message id) | |||
3 = Response (but at least one more response will follow) | |||
1000 = Authentication | |||
#### requestType: | |||
0 = DELETE | |||
1 = GET | |||
2 = POST | |||
3 = PUT | |||
4 = HEAD (not used in VPP) | |||
5 = PATCH | |||
6 = OPTIONS (not used in VPP) | |||
### For example: | |||
HTTP请求 | |||
http://localhost:8529/_db/test/_admin/echo?a=1&b=2&c[]=1&c[]=3 | |||
With header:: | |||
X-ArangoDB-Async: true | |||
is equivalent to | |||
[ | |||
1, // version | |||
1, // type | |||
"test", // database | |||
1, // requestType GET | |||
"/_admin/echo", // request path | |||
{ // parameters | |||
a: 1, | |||
b: 2, | |||
c: [ 1, 3 ] | |||
}, | |||
{ // meta | |||
x-arangodb-async: true | |||
} | |||
] | |||
该请求是一条以VelocyPack开头的消息。这个VelocyPack始终包含标题字段,参数和请求路径。 | |||
如果meta字段不包含内容类型,则采用默认值 "application/vpack",并且正文将是一个或多个VelocyPack对象。 | |||
#### The response will be | |||
[ | |||
1, // 0 - version | |||
2 or 3, // 1 - type | |||
400, // 2 - responseCode | |||
{ etag: "1234" } // 3 - meta: [[str]->[str]] | |||
] | |||
#### Body (binary data) | |||
请求可以通过管道传输或混合。使用标头中的“ messageId”映射响应。发送者有责任生成合适的“ messageId”值。 | |||
默认内容类型为"application/vpack"。 | |||
### Authentication | |||
A connection can be authenticated with the following message: | |||
[ | |||
1, // version | |||
1000, // type | |||
"plain", // encryption | |||
"admin", // user | |||
"plaintext", // password | |||
] | |||
or | |||
[ | |||
1, // version | |||
1000, // type | |||
"jwt", // encryption | |||
"abcd..." // token | |||
] | |||
The response is | |||
{ "error": false } | |||
if successful or | |||
{ | |||
"error": true, | |||
"errorMessage": "MESSAGE", | |||
"errorCode": CODE | |||
} | |||
如果不成功,则在这种情况下服务器将关闭连接。可以使用/_open/auth与HTTP版本相同的语义,使用未经身份验证的开放式路由,以与HTTP相同的方式获取JWT令牌。这样,可以通过JWT在单个会话中完成完整的身份验证。 | |||
### Content-Type and Accept | |||
通常,内容类型将是VPP,即主体是存储为VelocyPack的对象。 | |||
有时有必要使用非结构化数据(例如文本,css或html)进行响应。主体将是一个仅包含二进制属性的VelocyPack对象,并将相应地设置content-type。 | |||
规则如下。 | |||
#### Http | |||
Request: Content-Type | |||
"application/json"`: the body contains the JSON string representation | |||
`"application/vpack"`: the body contains a velocy pack | |||
有一些处理程序允许JSON列表(由换行符分隔)。在这种情况下,我们还允许不使用任何分隔符的多个速度包。 | |||
Request: Accept | |||
"application/json":如果可能,在正文中发送JSON字符串表示形式 | |||
"application/vpack":如果可能的话,在体内发送速度包 | |||
如果请求是"application/json"或"application/vpack"处理程序产生了其他请求(即"application/html"),那么将忽略接受。 | |||
如果请求被请求"application/json"并且处理程序产生 "application/vpack",则VPACK将转换为JSON。 | |||
如果请求被请求"application/vpack",并且处理程序生成“ application / json”,则JSON将转换为VPACK。 | |||
#### VPP | |||
与HTTP相似,不同之处在于:不支持“ Accept”标头,并且"application/json"始终将其转换为“ application / vpack”。这意味着主体包含一个或多个速度包。通常,它将包含一个-值得注意的例外是导入。 | |||
如果处理程序产生了其他东西(即"application/html"),则主体将是一个二进制blob(而不是一个velocy-pack),并且将相应地设置content-type。 | |||
连接后发送的第一个字节(“客户端”端-即使程序是双向的,也有服务器在监听端口,而客户端在连接端口) | |||
VST/1.1\r\n\r\n | |||
(11 Bytes) |
@ -1,6 +1,7 @@ | |||
{erl_opts, [{i, "include"}]}. | |||
{edoc_opts, [{preprocess, true}]}. | |||
{deps, [ | |||
{eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}}, | |||
{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.5"}}} | |||
%% {jsx, {git, "https://github.com/talentdeficit/jsx.git", {tag, "v3.0.0"}}} | |||
]}. | |||
@ -0,0 +1,77 @@ | |||
-module(agVstAgencyExm). | |||
-compile(inline). | |||
-compile({inline_size, 128}). | |||
-export([ | |||
start_link/3 | |||
, init_it/3 | |||
, system_code_change/4 | |||
, system_continue/3 | |||
, system_get_state/1 | |||
, system_terminate/4 | |||
]). | |||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% | |||
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. | |||
start_link(ServerName, Args, SpawnOpts) -> | |||
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). | |||
init_it(ServerName, Parent, Args) -> | |||
case safeRegister(ServerName) of | |||
true -> | |||
process_flag(trap_exit, true), | |||
moduleInit(Parent, Args); | |||
{false, Pid} -> | |||
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) | |||
end. | |||
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. | |||
system_code_change(MiscState, _Module, _OldVsn, _Extra) -> | |||
{ok, MiscState}. | |||
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok. | |||
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> | |||
loop(Parent, SrvState, CliState). | |||
-spec system_get_state(term()) -> {ok, term()}. | |||
system_get_state({_Parent, SrvState, _CliState}) -> | |||
{ok, SrvState}. | |||
-spec system_terminate(term(), pid(), [], term()) -> none(). | |||
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> | |||
terminate(Reason, SrvState, CliState). | |||
safeRegister(ServerName) -> | |||
try register(ServerName, self()) of | |||
true -> true | |||
catch | |||
_:_ -> {false, whereis(ServerName)} | |||
end. | |||
moduleInit(Parent, Args) -> | |||
case agTcpAgencyIns:init(Args) of | |||
{ok, SrvState, CliState} -> | |||
proc_lib:init_ack(Parent, {ok, self()}), | |||
loop(Parent, SrvState, CliState); | |||
{stop, Reason} -> | |||
proc_lib:init_ack(Parent, {error, Reason}), | |||
exit(Reason) | |||
end. | |||
loop(Parent, SrvState, CliState) -> | |||
receive | |||
{system, From, Request} -> | |||
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); | |||
{'EXIT', Parent, Reason} -> | |||
terminate(Reason, SrvState, CliState); | |||
Msg -> | |||
{ok, NewSrvState, NewCliState} = agVstAgencyIns:handleMsg(Msg, SrvState, CliState), | |||
loop(Parent, NewSrvState, NewCliState) | |||
end. | |||
terminate(Reason, SrvState, CliState) -> | |||
agTcpAgencyIns:terminate(Reason, SrvState, CliState), | |||
exit(Reason). | |||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% |
@ -0,0 +1,400 @@ | |||
-module(agVstAgencyIns). | |||
-include("agHttpCli.hrl"). | |||
-include("erlArango.hrl"). | |||
-compile(inline). | |||
-compile({inline_size, 128}). | |||
-export([ | |||
%% Inner Behavior API | |||
init/1 | |||
, handleMsg/3 | |||
, terminate/3 | |||
]). | |||
-spec init(term()) -> no_return(). | |||
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> | |||
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), | |||
self() ! ?miDoNetConnect, | |||
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. | |||
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. | |||
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, | |||
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, | |||
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) -> | |||
case Socket of | |||
undefined -> | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}), | |||
{ok, SrvState, CliState}; | |||
_ -> | |||
case BacklogNum >= BacklogSize of | |||
true -> | |||
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}), | |||
{ok, SrvState, CliState}; | |||
_ -> | |||
case Status of | |||
leisure -> %% 空闲模式 | |||
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), | |||
case gen_tcp:send(Socket, Request) of | |||
ok -> | |||
TimerRef = | |||
case OverTime of | |||
infinity -> | |||
undefined; | |||
_ -> | |||
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) | |||
end, | |||
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; | |||
{error, Reason} -> | |||
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}}) | |||
end; | |||
_ -> | |||
{ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}} | |||
end | |||
end | |||
end; | |||
handleMsg({tcp, Socket, Data}, | |||
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, | |||
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> | |||
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of | |||
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> | |||
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) | |||
end; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); | |||
[MiRequest | Outs] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) | |||
end; | |||
{ok, NewRecvState} -> | |||
{ok, SrvState, CliState#cliState{recvState = NewRecvState}}; | |||
{error, Reason} -> | |||
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}}) | |||
catch | |||
E:R:S -> | |||
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError}) | |||
end; | |||
handleMsg({timeout, TimerRef, waiting_over}, | |||
#srvState{socket = Socket} = SrvState, | |||
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), | |||
%% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱 | |||
gen_tcp:close(Socket), | |||
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); | |||
handleMsg({tcp_closed, Socket}, | |||
#srvState{socket = Socket, serverName = ServerName} = SrvState, | |||
CliState) -> | |||
?WARN(ServerName, "connection closed~n", []), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); | |||
handleMsg({tcp_error, Socket, Reason}, | |||
#srvState{socket = Socket, serverName = ServerName} = SrvState, | |||
CliState) -> | |||
?WARN(ServerName, "connection error: ~p~n", [Reason]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); | |||
handleMsg(?miDoNetConnect, | |||
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, | |||
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> | |||
case ?agBeamPool:getv(PoolName) of | |||
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> | |||
case dealConnect(ServerName, HostName, Port, SocketOpts) of | |||
{ok, Socket} -> | |||
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), | |||
%% 新建连接之后 需要重置之前的buff之类状态数据 | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}}; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) | |||
end; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); | |||
[MiRequest | Outs] -> | |||
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) | |||
end; | |||
{error, _Reason} -> | |||
agAgencyUtils:reconnectTimer(SrvState, CliState) | |||
end; | |||
_Ret -> | |||
?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret]) | |||
end; | |||
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> | |||
?WARN(ServerName, "unknown msg: ~p~n", [Msg]), | |||
{ok, SrvState, CliState}. | |||
-spec terminate(term(), srvState(), cliState()) -> ok. | |||
terminate(_Reason, | |||
#srvState{socket = Socket} = SrvState, | |||
CliState) -> | |||
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), | |||
ok. | |||
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. | |||
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) -> | |||
case Status of | |||
leisure -> | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState, CliState}; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []}); | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs}) | |||
end; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []}); | |||
[MiRequest | Outs] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs}) | |||
end; | |||
_ -> | |||
overReceiveTcpData(SrvState, CliState) | |||
end. | |||
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. | |||
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, | |||
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, | |||
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> | |||
case erlang:monotonic_time(millisecond) > OverTime of | |||
true -> | |||
%% 超时了 | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; | |||
[MiRequest] -> | |||
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) | |||
end; | |||
[MiRequest] -> | |||
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); | |||
[MiRequest | Outs] -> | |||
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) | |||
end; | |||
_ -> | |||
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), | |||
case gen_tcp:send(Socket, Request) of | |||
ok -> | |||
TimerRef = | |||
case OverTime of | |||
infinity -> | |||
undefined; | |||
_ -> | |||
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) | |||
end, | |||
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}); | |||
{error, Reason} -> | |||
?WARN(ServerName, ":send error: ~p~n", [Reason]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) | |||
end | |||
end. | |||
-spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}. | |||
overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, | |||
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> | |||
receive | |||
{tcp, Socket, Data} -> | |||
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of | |||
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> | |||
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) | |||
end; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); | |||
[MiRequest | Outs] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) | |||
end; | |||
{ok, NewRecvState} -> | |||
overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState}); | |||
{error, Reason} -> | |||
?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}}) | |||
catch | |||
E:R:S -> | |||
?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}}) | |||
end; | |||
{timeout, TimerRef, waiting_over} -> | |||
case CurInfo of | |||
{_PidForm, _RequestId, TimerRef} -> | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:agencyReply(CurInfo, {error, timeout}), | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; | |||
[MiRequest] -> | |||
case ?agBeamPool:getv(PoolName) of | |||
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> | |||
case dealConnect(ServerName, HostName, Port, SocketOpts) of | |||
{ok, NewSocket} -> | |||
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); | |||
{error, _Reason} -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) | |||
end; | |||
_Ret -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) | |||
end; | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
case ?agBeamPool:getv(PoolName) of | |||
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> | |||
case dealConnect(ServerName, HostName, Port, SocketOpts) of | |||
{ok, NewSocket} -> | |||
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); | |||
{error, _Reason} -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) | |||
end; | |||
_Ret -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) | |||
end | |||
end; | |||
[MiRequest] -> | |||
case ?agBeamPool:getv(PoolName) of | |||
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> | |||
case dealConnect(ServerName, HostName, Port, SocketOpts) of | |||
{ok, NewSocket} -> | |||
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); | |||
{error, _Reason} -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) | |||
end; | |||
_Ret -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) | |||
end; | |||
[MiRequest | Outs] -> | |||
case ?agBeamPool:getv(PoolName) of | |||
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> | |||
case dealConnect(ServerName, HostName, Port, SocketOpts) of | |||
{ok, NewSocket} -> | |||
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); | |||
{error, _Reason} -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) | |||
end; | |||
_Ret -> | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) | |||
end | |||
end; | |||
_ -> | |||
?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), | |||
overReceiveTcpData(SrvState, CliState) | |||
end; | |||
{tcp_closed, Socket} -> | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); | |||
{tcp_error, Socket, Reason} -> | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); | |||
#miRequest{} = MiRequest -> | |||
overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); | |||
_Msg -> | |||
?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), | |||
overReceiveTcpData(SrvState, CliState) | |||
end. | |||
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. | |||
dealConnect(ServerName, HostName, Port, SocketOptions) -> | |||
case inet:getaddrs(HostName, inet) of | |||
{ok, IPList} -> | |||
Ip = agMiscUtils:randomElement(IPList), | |||
case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of | |||
{ok, Socket} -> | |||
{ok, Socket}; | |||
{error, Reason} -> | |||
?WARN(ServerName, "connect error: ~p~n", [Reason]), | |||
{error, Reason} | |||
end; | |||
{error, Reason} -> | |||
?WARN(ServerName, "getaddrs error: ~p~n", [Reason]), | |||
{error, Reason} | |||
end. | |||
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. | |||
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, | |||
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, | |||
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> | |||
case erlang:monotonic_time(millisecond) > OverTime of | |||
true -> | |||
%% 超时了 | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), | |||
case RequestsOuts of | |||
[] -> | |||
case RequestsIns of | |||
[] -> | |||
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); | |||
MiRLists -> | |||
[MiRequest | Outs] = lists:reverse(MiRLists), | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) | |||
end; | |||
[MiRequest] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); | |||
[MiRequest | Outs] -> | |||
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) | |||
end; | |||
_ -> | |||
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), | |||
case gen_tcp:send(Socket, Request) of | |||
ok -> | |||
TimerRef = | |||
case OverTime of | |||
infinity -> | |||
undefined; | |||
_ -> | |||
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) | |||
end, | |||
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; | |||
{error, Reason} -> | |||
?WARN(ServerName, ":send error: ~p~n", [Reason]), | |||
gen_tcp:close(Socket), | |||
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), | |||
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) | |||
end | |||
end. | |||
@ -0,0 +1,438 @@ | |||
-module(vst). | |||
-export([ | |||
authorize/1, | |||
connect/2, | |||
request/2, | |||
vst_maxsize/0 | |||
]). | |||
assign_map(_acc@1, _key@1, _value@1) -> | |||
case _acc@1 of | |||
#{_key@1 := _} -> _acc@1; | |||
#{} -> _acc@1#{_key@1 => _value@1}; | |||
_ -> #{_key@1 => _value@1} | |||
end. | |||
assign_split([<<>>, _rest@1], _value@1, _acc@1, _pattern@1) -> | |||
_parts@1 = binary:split(_rest@1, _pattern@1), | |||
case _acc@1 of | |||
[_ | _] -> | |||
[assign_split(_parts@1, _value@1, none, _pattern@1) | _acc@1]; | |||
none -> | |||
[assign_split(_parts@1, _value@1, none, _pattern@1)]; | |||
_ -> _acc@1 | |||
end; | |||
assign_split([_key@1, _rest@1], _value@1, _acc@1, _pattern@1) -> | |||
_parts@1 = binary:split(_rest@1, _pattern@1), | |||
case _acc@1 of | |||
#{_key@1 := _current@1} -> | |||
_acc@1#{_key@1 => | |||
assign_split(_parts@1, _value@1, _current@1, _pattern@1)}; | |||
#{} -> | |||
_acc@1#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)}; | |||
_ -> | |||
#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)} | |||
end; | |||
assign_split([<<>>], nil, _acc@1, __pattern@1) -> | |||
case _acc@1 of | |||
[_ | _] -> _acc@1; | |||
_ -> [] | |||
end; | |||
assign_split([<<>>], _value@1, _acc@1, __pattern@1) -> | |||
case _acc@1 of | |||
[_ | _] -> [_value@1 | _acc@1]; | |||
none -> [_value@1]; | |||
_ -> _acc@1 | |||
end; | |||
assign_split([_key@1], _value@1, _acc@1, __pattern@1) -> | |||
assign_map(_acc@1, _key@1, _value@1). | |||
authorize(#{socket := _socket@1, username := _un@1, password := _pw@1} = _state@1) -> | |||
case eVPack:encode([1, 1000, <<"plain">>, _un@1, _pw@1]) of | |||
{ok, _auth@1} -> | |||
case send_stream(_socket@1, build_stream(_auth@1)) of | |||
ok -> | |||
case recv_header(_socket@1) of | |||
{ok, _header@1} -> | |||
case recv_stream(_socket@1, _header@1) of | |||
{ok, _stream@1} -> | |||
case decode_stream(_stream@1) of | |||
{ok, [[1, 2, 200, __headers@2] | __body@1]} -> | |||
ok; | |||
_@1 -> | |||
case _@1 of | |||
{ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} -> | |||
{error, #{ | |||
'__exception__' => true, | |||
error_num => nil, | |||
status => _status@1, | |||
message => proplists:get_value(<<"errorMessage">>, _body@1), | |||
endpoint => | |||
case _state@1 of | |||
#{endpoint := _@3} -> | |||
_@3; | |||
_@3 when erlang:is_map(_@3) -> | |||
erlang:error({badkey, endpoint, _@3}); | |||
_@3 -> | |||
_@3:endpoint() | |||
end}}; | |||
{error, _reason@1} -> | |||
{error, _reason@1}; | |||
_@2 -> | |||
erlang:error({with_clause, _@2}) | |||
end | |||
end; | |||
_@1 -> | |||
case _@1 of | |||
{ok, | |||
[[1, 2, _status@1, __headers@1], _body@1 | _]} -> | |||
{error, #{ | |||
'__exception__' => true, | |||
error_num => nil, | |||
status => _status@1, | |||
message => proplists:get_value(<<"errorMessage">>, _body@1), | |||
endpoint => | |||
case _state@1 of | |||
#{endpoint := _@3} -> | |||
_@3; | |||
_@3 when erlang:is_map(_@3) -> | |||
erlang:error({badkey, endpoint, _@3}); | |||
_@3 -> _@3:endpoint() | |||
end}}; | |||
{error, _reason@1} -> | |||
{error, _reason@1}; | |||
_@2 -> erlang:error({with_clause, _@2}) | |||
end | |||
end; | |||
_@1 -> | |||
case _@1 of | |||
{ok, | |||
[[1, 2, _status@1, __headers@1], _body@1 | |||
| _]} -> | |||
{error, #{ | |||
'__exception__' => true, | |||
error_num => nil, status => _status@1, | |||
message => proplists:get_value(<<"errorMessage">>, _body@1), | |||
endpoint => | |||
case _state@1 of | |||
#{endpoint := _@3} -> _@3; | |||
_@3 when erlang:is_map(_@3) -> | |||
erlang:error({badkey, | |||
endpoint, | |||
_@3}); | |||
_@3 -> _@3:endpoint() | |||
end}}; | |||
{error, _reason@1} -> {error, _reason@1}; | |||
_@2 -> erlang:error({with_clause, _@2}) | |||
end | |||
end; | |||
_@1 -> | |||
case _@1 of | |||
{ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} -> | |||
{error, | |||
#{ | |||
'__exception__' => true, error_num => nil, | |||
status => _status@1, | |||
message => proplists:get_value(<<"errorMessage">>, _body@1), | |||
endpoint => | |||
case _state@1 of | |||
#{endpoint := _@3} -> _@3; | |||
_@3 when erlang:is_map(_@3) -> | |||
erlang:error({badkey, | |||
endpoint, | |||
_@3}); | |||
_@3 -> _@3:endpoint() | |||
end}}; | |||
{error, _reason@1} -> {error, _reason@1}; | |||
_@2 -> erlang:error({with_clause, _@2}) | |||
end | |||
end; | |||
_@1 -> | |||
case _@1 of | |||
{ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} -> | |||
{error, #{ | |||
'__exception__' => true, | |||
error_num => nil, | |||
status => _status@1, | |||
message => proplists:get_value(<<"errorMessage">>, _body@1), | |||
endpoint => | |||
case _state@1 of | |||
#{endpoint := _@3} -> _@3; | |||
_@3 when erlang:is_map(_@3) -> | |||
erlang:error({badkey, endpoint, _@3}); | |||
_@3 -> _@3:endpoint() | |||
end}}; | |||
{error, _reason@1} -> {error, _reason@1}; | |||
_@2 -> erlang:error({with_clause, _@2}) | |||
end | |||
end. | |||
body_for(<<>>) -> {ok, <<>>}; | |||
body_for(_body@1) -> | |||
eVPack:encode(_body@1). | |||
body_from([]) -> nil; | |||
body_from([_body@1]) -> _body@1; | |||
body_from(_body@1) -> _body@1. | |||
build_stream(_message@1) -> | |||
case chunk_every(_message@1, 30696) of | |||
[_first_chunk@1 | _rest_chunks@1] -> | |||
_n_chunks@1 = erlang:length([_first_chunk@1 | |||
| _rest_chunks@1]), | |||
_msg_length@1 = erlang:byte_size(_message@1) + | |||
_n_chunks@1 * 24, | |||
_rest_chunks@2 = | |||
lists:reverse(lists:flodl( | |||
fun(_n@1, _@1) -> | |||
case _rest_chunks@1 /= [] of | |||
true -> | |||
[prepend_chunk(lists:nth(_n@1, _rest_chunks@1), _n@1, 0, 0, _msg_length@1) | _@1]; | |||
false -> _@1 | |||
end | |||
end, [], lists:seq(1, erlang:length(_rest_chunks@1)))), | |||
[prepend_chunk(_first_chunk@1, _n_chunks@1, 1, 0, _msg_length@1) | _rest_chunks@2]; | |||
_only_chunk@1 -> | |||
prepend_chunk(_only_chunk@1, 1, 1, 0, erlang:byte_size(_message@1) + 24) | |||
end. | |||
chunk_every(_bytes@1, _size@1) when erlang:byte_size(_bytes@1) =< _size@1 -> | |||
_bytes@1; | |||
chunk_every(_bytes@1, _size@1) -> | |||
<<_chunk@1:_size@1/binary, _rest@1/binary>> = _bytes@1, | |||
[_chunk@1 | (chunk_every(_rest@1, _size@1))]. | |||
connect(#{addr := _addr@1, 'ssl?' := _ssl}, _opts@1) -> | |||
_mod@1 = case _ssl of | |||
_@1 when _@1 =:= false orelse _@1 =:= nil -> gen_tcp; | |||
_ -> ssl | |||
end, | |||
_transport_opts@1 = case _ssl of | |||
_@2 when _@2 =:= false orelse _@2 =:= nil -> | |||
tcp_opts; | |||
_ -> ssl_opts | |||
end, | |||
_transport_opts@2 = proplists:get_value(_transport_opts@1, _opts@1, []), | |||
_connect_timeout@1 = proplists:get(connect_timeout, _opts@1, 5000), | |||
_options@1 = lists:merge(_transport_opts@2, [{packet, raw}, {mode, binary}, {active, false}]), | |||
case _mod@1:connect(addr_for(_addr@1), port_for(_addr@1), _options@1, _connect_timeout@1) of | |||
{ok, _port@1} -> | |||
case _mod@1:send(_port@1, <<"VST/1.1\r\n\r\n">>) of | |||
ok -> {ok, {_mod@1, _port@1}}; | |||
_@3 -> _@3 | |||
end; | |||
_@3 -> _@3 | |||
end. | |||
decode_pair({_key@1, _value@1}, _acc@1) -> | |||
case case _key@1 /= <<>> of | |||
false -> false; | |||
true -> binary:last(_key@1) == 93 | |||
end | |||
of | |||
false -> assign_map(_acc@1, _key@1, _value@1); | |||
true -> | |||
_subkey@1 = binary:part(_key@1, | |||
0, | |||
erlang:byte_size(_key@1) - 1), | |||
assign_split(binary:split(_subkey@1, <<"[">>), | |||
_value@1, | |||
_acc@1, | |||
binary:compile_pattern(<<"][">>)) | |||
end. | |||
decode_stream(_@1) -> decode_stream(_@1, []). | |||
decode_stream(<<>>, _acc@1) -> {ok, _acc@1}; | |||
decode_stream(_stream@1, _acc@1) -> | |||
case eVPack:decode(_stream@1) of | |||
{ok, {_term@1, _rest@1}} -> | |||
decode_stream(_rest@1, erlang:'++'(_acc@1, [_term@1])); | |||
{ok, _term@2} -> {ok, erlang:'++'(_acc@1, [_term@2])}; | |||
{error, _reason@1} -> {error, _reason@1} | |||
end. | |||
headers_for(#{} = _headers@1) -> _headers@1; | |||
headers_for(_headers@1) | |||
when erlang:is_list(_headers@1) -> | |||
maps:from_list(_headers@1). | |||
method_for(delete) -> 0; | |||
method_for(get) -> 1; | |||
method_for(post) -> 2; | |||
method_for(put) -> 3; | |||
method_for(head) -> 4; | |||
method_for(patch) -> 5; | |||
method_for(options) -> 6; | |||
method_for(_) -> -1. | |||
port_for({unix, __path@1}) -> 0; | |||
port_for({tcp, __host@1, _port@1}) -> _port@1. | |||
prepend_chunk(_chunk@1, _chunk_n@1, _is_first@1, | |||
_msg_id@1, _msg_length@1) -> | |||
<<(24 + erlang:byte_size(_chunk@1)):32/integer-little, | |||
(binary:decode_unsigned(<<_chunk_n@1:31/integer, | |||
_is_first@1:1/integer>>, | |||
little)):32/integer, | |||
_msg_id@1:64/integer-little, | |||
_msg_length@1:64/integer-little, _chunk@1/binary>>. | |||
query_for(nil) -> #{}. | |||
recv_chunk({_mod@1, _port@1}, _chunk_length@1) -> | |||
_mod@1:recv(_port@1, _chunk_length@1 - 24). | |||
recv_header({_mod@1, _port@1}) -> | |||
case _mod@1:recv(_port@1, 24) of | |||
{ok, | |||
<<_chunk_length@1:32/integer-little, | |||
_chunk_x@1:32/integer, _msg_id@1:64/integer-little, | |||
_msg_length@1:64/integer-little>>} -> | |||
<<_chunk_n@1:31/integer, _is_first@1:1/integer>> = | |||
<<_chunk_x@1:32/integer-little>>, | |||
{ok, | |||
[_chunk_length@1, | |||
_chunk_n@1, | |||
_is_first@1, | |||
_msg_id@1, | |||
_msg_length@1]}; | |||
{error, _reason@1} -> {error, _reason@1} | |||
end. | |||
recv_stream(_socket@1, | |||
[_chunk_length@1, 1, 1, __msg_id@1, __msg_length@1]) -> | |||
recv_chunk(_socket@1, _chunk_length@1); | |||
recv_stream(_socket@1, | |||
[_chunk_length@1, | |||
_n_chunks@1, | |||
1, | |||
__msg_id@1, | |||
__msg_length@1]) -> | |||
case recv_chunk(_socket@1, _chunk_length@1) of | |||
{ok, _buffer@1} -> | |||
case recv_stream(_socket@1, _n_chunks@1, _buffer@1) of | |||
{ok, _stream@1} -> {ok, _stream@1}; | |||
_@1 -> _@1 | |||
end; | |||
_@1 -> _@1 | |||
end. | |||
recv_stream(_socket@1, _n_chunks@1, _buffer@1) -> | |||
lists:reduce_while(lists:seq(1, _n_chunks@1 - 1), | |||
_buffer@1, | |||
fun(_n@1, _buffer@2) -> | |||
case recv_header(_socket@1) of | |||
{ok, [_chunk_length@1, _, _, _, _]} -> | |||
case recv_chunk(_socket@1, _chunk_length@1) of | |||
{ok, _chunk@1} -> | |||
case _n@1 == _n_chunks@1 - 1 of | |||
false -> | |||
{cont, <<_buffer@2/binary, _chunk@1/binary>>}; | |||
true -> | |||
{halt, {ok, <<_buffer@2/binary, _chunk@1/binary>>}} | |||
end; | |||
_@1 -> | |||
case _@1 of | |||
{error, _reason@1} -> | |||
{halt, {error, _reason@1}}; | |||
_@2 -> | |||
erlang:error({with_clause, _@2}) | |||
end | |||
end; | |||
_@1 -> | |||
case _@1 of | |||
{error, _reason@1} -> | |||
{halt, {error, _reason@1}}; | |||
_@2 -> | |||
erlang:error({with_clause, _@2}) | |||
end | |||
end | |||
end). | |||
request(#{method := _method@1, path := _path@1, headers := _headers@1, body := _body@1}, #{socket := _socket@1, database := _database@1} = _state@1) -> | |||
#{path := _path@2, query := _query@1} = http_uri:parse(_path@1), | |||
{_database@3, _path@4} = | |||
case _path@2 of | |||
<<"/_db/", _rest@1/binary>> -> | |||
[_database@2, _path@3] = binary:split(_rest@1, <<"/">>), | |||
{_database@2, <<"/", _path@3/binary>>}; | |||
_ -> | |||
{case _database@1 of | |||
_@1 when _@1 =:= false orelse _@1 =:= nil -> | |||
<<>>; | |||
_@2 -> _@2 | |||
end, _path@2} | |||
end, | |||
_request@1 = [1, 1, _database@3, method_for(_method@1), _path@4, query_for(_query@1), headers_for(_headers@1)], | |||
case eVPack:encode(_request@1) of | |||
{ok, _request@2} -> | |||
case body_for(_body@1) of | |||
{ok, _body@2} -> | |||
case send_stream(_socket@1, build_stream(<<_request@2/binary, _body@2/binary>>)) | |||
of | |||
ok -> | |||
case recv_header(_socket@1) of | |||
{ok, _header@1} -> | |||
case recv_stream(_socket@1, _header@1) of | |||
{ok, _stream@1} -> | |||
case decode_stream(_stream@1) of | |||
{ok, [[1, 2, _status@1, _headers@2] | _body@3]} -> | |||
{ok, #{status => _status@1, headers => _headers@2, body => body_from(_body@3)}, _state@1}; | |||
_@3 -> | |||
case _@3 of | |||
{error, closed} -> | |||
{error, noproc, _state@1}; | |||
{error, _reason@1} -> | |||
{error, _reason@1, _state@1}; | |||
_@4 -> | |||
erlang:error({with_clause, _@4}) | |||
end | |||
end; | |||
_@3 -> | |||
case _@3 of | |||
{error, closed} -> | |||
{error, noproc, _state@1}; | |||
{error, _reason@1} -> | |||
{error, _reason@1, _state@1}; | |||
_@4 -> | |||
erlang:error({with_clause, _@4}) | |||
end | |||
end; | |||
_@3 -> | |||
case _@3 of | |||
{error, closed} -> | |||
{error, noproc, _state@1}; | |||
{error, _reason@1} -> | |||
{error, _reason@1, _state@1}; | |||
_@4 -> erlang:error({with_clause, _@4}) | |||
end | |||
end; | |||
_@3 -> | |||
case _@3 of | |||
{error, closed} -> {error, noproc, _state@1}; | |||
{error, _reason@1} -> | |||
{error, _reason@1, _state@1}; | |||
_@4 -> erlang:error({with_clause, _@4}) | |||
end | |||
end; | |||
_@3 -> | |||
case _@3 of | |||
{error, closed} -> {error, noproc, _state@1}; | |||
{error, _reason@1} -> {error, _reason@1, _state@1}; | |||
_@4 -> erlang:error({with_clause, _@4}) | |||
end | |||
end; | |||
_@3 -> | |||
case _@3 of | |||
{error, closed} -> {error, noproc, _state@1}; | |||
{error, _reason@1} -> {error, _reason@1, _state@1}; | |||
_@4 -> erlang:error({with_clause, _@4}) | |||
end | |||
end. | |||
send_stream(Socket, IoData) -> | |||
tcp:send(Socket, IoData). | |||
vst_maxsize() -> 30720. |