From c6b39de0c2f5f92b976f6bf4b6f06ab864e4a34f Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Sun, 29 Nov 2020 22:47:43 +0800 Subject: [PATCH] =?UTF-8?q?vst=E7=9B=B8=E5=85=B3=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- VelocyStream.md | 286 ++++++++++++++++++++ VelocyStream_zh.md | 188 +++++++++++++ rebar.config | 1 + src/agHttpCli/agVstAgencyExm.erl | 77 ++++++ src/agHttpCli/agVstAgencyIns.erl | 400 ++++++++++++++++++++++++++++ src/agHttpCli/vst.erl | 438 +++++++++++++++++++++++++++++++ 6 files changed, 1390 insertions(+) create mode 100644 VelocyStream.md create mode 100644 VelocyStream_zh.md create mode 100644 src/agHttpCli/agVstAgencyExm.erl create mode 100644 src/agHttpCli/agVstAgencyIns.erl create mode 100644 src/agHttpCli/vst.erl diff --git a/VelocyStream.md b/VelocyStream.md new file mode 100644 index 0000000..8857581 --- /dev/null +++ b/VelocyStream.md @@ -0,0 +1,286 @@ +Client / Server Communication (VST 1.1) +======================================= + +Version 1.1.0 of 23 November 2016 + +HTTP +---- + +Use VelocyPack as body. Content-Type is `"application/vpack"` + +Binary Protocol +--------------- + +This is not a request / response protocol. It is symmetric (in +principle). Messages can be sent back and forth, pipelined, multiplexed, +uni-directional or bi-directional. + +It is possible that a message generates + +- no response + +- exactly one response + +- multiple responses + +The VelocyStream does **not** impose or specify or require one of the +above behaviors. The application must define a behavior in general or on +a per-request basis, see below. The server and client must then +implement that behavior. + +### Message vs. Chunk + +The consumer (client or server) will deal with messages. A message +consists of one or more VelocyPacks (or in some cases of certain parts +of binary data). How many VelocyPacks are part of a message is +completely application dependent, see below for ArangoDB. + +It is possible that the messages are very large. As messages can be +multiplexed over one connection, large messages need to be split into +chunks. The sender/receiver class will accept a vector of VelocyPacks, +split them into chunks, send these chunks over the wire, assemble these +chunks, generates a vector of VelocyPacks and passes this to the +consumer. + +### Chunks + +In order to allow reassemble chunks, each package is prefixed by a small +header. A chunk is always at least 24 bytes long. The byte order is +ALWAYS little endian. The format of a chunk is the following, regardless +on whether it is the first in a message or a subsequent one: + + +| name | type | description | +| --------------- | ------------------------- | --- | +| length | uint32\_t | total length in bytes of the current chunk, including this header | +| chunkX | uint32\_t | chunk/isFirstChunk (upper 31bits/lowest bit), details see below | +| messageId | uint64\_t | a unique identifier, it is the responsibility of the sender to generate such an identifier (zero is reserved for not set ID) | +| messageLength | uint64\_t | the total size of the message. | +| Binary data | binary data blob | size b1 | + + +Clarification: "chunk" and "isFirstChunk" are combined into an unsigned +32bit value. Therefore it will be encoded as + + uint32_t chunkX + +and extracted as + + chunk = chunkX >> 1 + + isFirstChunk = chunkX & 0x1 + +For the first chunk of a message, the low bit of the second uint32\_t is +set, for all subsequent ones it is reset. In the first chunk of a +message, the number "chunk" is the total number of chunks in the +message, in all subsequent chunks, the number "chunk" is the current +number of this chunk. + +The total size of the data package is (24 + b1) bytes. This number is +stored in the length field. If one needs to send messages larger than +UINT32\_MAX, then these messages must be chunked. In general it is a +good idea to restrict the maximal size to a few megabytes. + +**Notes:** + +When sending a (small) message, it is important (for performance reasons) +to ensure that only one TCP +packet is sent. For example, by using sendmmsg under Linux +([*https://blog.cloudflare.com/how-to-receive-a-million-packets/*](https://blog.cloudflare.com/how-to-receive-a-million-packets/)) + +Implementors should nevertheless be aware that in TCP/IP one cannot +enforce this and so implementations must always be aware that some part +of the network stack can split packets and the payload might arrive in +multiple parts! + +ArangoDB +======== + +### Request / Response + +For an ArangoDB client, the request is of the following format, the +array is a VelocyPack array: + + [ + /* 0 - version: */ 1, // [int] + /* 1 - type: */ 1, // [int] 1=Req, 2=Res,.. + /* 2 - database: */ "test", // [string] + /* 3 - requestType: */ 1, // [int] 0=Delete, ... + /* 4 - request: */ "/_api/collection", // [string\] + /* 5 - parameter: */ { force: true }, // [[string]->[string]] + /* 6 - meta: */ { x-arangodb: true } // [[string]->[string]] + ] + + Body (binary data) + +If database is missing (entry is `null`), then "\_system" is assumed. + +`type`: + + 1 = Request + 2 = Response (final response for this message id) + 3 = Response (but at least one more response will follow) + 1000 = Authentication + +`requestType`: + + 0 = DELETE + 1 = GET + 2 = POST + 3 = PUT + 4 = HEAD (not used in VPP) + 5 = PATCH + 6 = OPTIONS (not used in VPP) + +For example: + +The HTTP request + + http://localhost:8529/_db/test/_admin/echo?a=1&b=2&c[]=1&c[]=3 + +With header: + + X-ArangoDB-Async: true + +is equivalent to + + [ + 1, // version + 1, // type + "test", // database + 1, // requestType GET + "/_admin/echo", // request path + { // parameters + a: 1, + b: 2, + c: [ 1, 3 ] + }, + { // meta + x-arangodb-async: true + } + ] + +The request is a message beginning with one VelocyPack. This VelocyPack +always contains the header fields, parameters and request path. If the +meta field does not contain a content type, then the default +`"application/vpack"` is assumed and the body will be one or multiple +VelocyPack object. + +The response will be + + [ + 1, // 0 - version + 2 or 3, // 1 - type + 400, // 2 - responseCode + { etag: "1234" } // 3 - meta: [[str]->[str]] + ] + + Body (binary data) + +Request can be pipelined or mixed. The responses are mapped using the +"messageId" in the header. It is the responsibility of the **sender** to +generate suitable "messageId" values. + +The default content-type is `"application/vpack"`. + +### Authentication + +A connection can be authenticated with the following message: + + [ + 1, // version + 1000, // type + "plain", // encryption + "admin", // user + "plaintext", // password + ] + +or + + [ + 1, // version + 1000, // type + "jwt", // encryption + "abcd..." // token + ] + +The response is + + { "error": false } + +if successful or + + { + "error": true, + "errorMessage": "MESSAGE", + "errorCode": CODE + } + +if not successful, and in this case the connection is closed by the server. +One can acquire a JWT token in the same way as with HTTP using the +open, unauthenticated route `/_open/auth` with the same semantics as +in the HTTP version. In this way, the complete authentication can be +done in a single session via JWT. + + +### Content-Type and Accept + +In general the content-type will be VPP, that is the body is an object +stored as VelocyPack. + +Sometimes it is necessary to respond with unstructured data, like text, +css or html. The body will be a VelocyPack object containing just a +binary attribute and the content-type will be set accordingly. + +The rules are as follows. + +#### Http + +Request: Content-Type + +- `"application/json"`: the body contains the JSON string representation + +- `"application/vpack"`: the body contains a velocy pack + +There are some handler that allow lists of JSON (seperared by newline). +In this case we also allow multiple velocy packs without any separator. + +Request: Accept + +- `"application/json"`: send a JSON string representation in the body, + if possible + +- `"application/vpack"`: send velocy pack in the body, if possible + +If the request asked for `"application/json"` or `"application/vpack"` and +the handler produces something else (i.e. `"application/html"`), then the +accept is ignored. + +If the request asked `"application/json"` and the handler produces +`"application/vpack"`, then the VPACK is converted into JSON. + +If the request asked `"application/vpack"` and the handler produces +"application/json", then the JSON is converted into VPACK. + +#### VPP + +Similar to HTTP with the exception: the "Accept" header is not supported +and `"application/json"` will always be converted into +"application/vpack". This means that the body contains one or more +velocy-packs. In general it will contain one - notable exception being +the import. + +If the handler produces something else (i.e. `"application/html"`), then +The body will be a binary blob (instead of a velocy-pack) and the +content-type will be set accordingly. + +The first bytes sent after a connection (the "client" side - even if the +program is bi-directional, there is a server listening to a port and a +client connecting to a port) are + + VST/1.1\r\n\r\n + +(11 Bytes) + + + diff --git a/VelocyStream_zh.md b/VelocyStream_zh.md new file mode 100644 index 0000000..b703a48 --- /dev/null +++ b/VelocyStream_zh.md @@ -0,0 +1,188 @@ +Client / Server Communication (VST 1.1) +======================================= + +Version 1.1.0 of 23 November 2016 + +# HTTP + 使用VelocyPack作为身体。内容类型为"application/vpack" + +Binary Protocol +--------------- + 这不是请求/响应协议。它是对称的(原则上)。消息可以来回发送,流水线,多路复用,单向或双向。 + 可能会生成一条消息 + 没有反应 + 只是一个回应 + 多重回应 + + 该VelocyStream并没有强加或指定或要求上述行为之一。 + 应用程序必须在一般情况下或根据每个请求定义行为, + 请参见下文。然后,服务器和客户端必须实现该行为。 + +### Message vs. Chunk + 使用者(客户端或服务器)将处理消息。一条消息包含一个或多个VelocyPack(在某些情况下是二进制数据的某些部分)。 + 消息中有多少VelocyPacks完全取决于应用程序,请参阅下文了解ArangoDB。 + + 消息可能很大。由于可以通过一个连接对消息进行多路复用,因此需要将大消息拆分为多个块。 + 发送者/接收者类将接受一个VelocyPacks向量,将其分割成块,通过导线发送这些块,组装这些块, + 生成一个VelocyPacks向量并将其传递给消费者。 + +### Chunks + 为了允许重组块,每个程序包都以一个 header 作为前缀。块始终至少为24个字节长。字节顺序始终是小端。 + 块的格式如下,无论它是消息中的第一个消息还是随后的消息: + + 名称 类型 描述 + length uint32_t 当前块(包括此标头)的总长度(以字节为单位) + chunkX uint32_t chunk / isFirstChunk(高31位/最低位),详细信息请参见下文 + messageId uint64_t 唯一标识符,发送方有责任生成这样的标识符(zero is reserved for not set ID) + messageLength uint64_t the total size of the message. + Binary data binary data blob size b1 | + + 声明:"chunk" and "isFirstChunk" are combined into an unsigned 32bit value. + Therefore it will be encoded as + uint32_t chunkX and extracted as + + chunk = chunkX >> 1 + isFirstChunk = chunkX & 0x1 + + 对于消息的第一块,设置第二个uint32_t的低位,对于所有后续消息,将其复位。 + 在消息的第一个块中,数字 chunk 是消息中的块总数,在所有后续块中,数字 chunk 是该块的当前number。 + + 数据包的总大小为(24 + b1)字节。此数字存储在length字段中。 + 如果需要发送大于UINT32_MAX的消息,则必须将这些消息分块。通常,最好将最大大小限制为几兆字节。 + +### **Notes:** + 发送(小)消息时,(出于性能原因)确保仅发送一个TCP数据包非常重要。 + 例如,通过在Linux下使用sendmmsg + ([https://blog.cloudflare.com/how-to-receive-a-million-packets](https://blog.cloudflare.com/how-to-receive-a-million-packets/)) + 但是,实现者应该意识到,在TCP / IP中不能强制执行此操作,因此实现者必须始终意识到, + 网络堆栈的某些部分可以拆分数据包,并且有效负载可能分成多个部分! + +ArangoDB +======== +### Request / Response + 对于ArangoDB客户端,请求的格式如下,该数组是VelocyPack数组: + [ + /* 0 - version: */ 1, // [int] + /* 1 - type: */ 1, // [int] 1=Req, 2=Res,.. + /* 2 - database: */ "test", // [string] + /* 3 - requestType: */ 1, // [int] 0=Delete, ... + /* 4 - request: */ "/_api/collection", // [string\] + /* 5 - parameter: */ { force: true }, // [[string]->[string]] http的请求参数列表 + /* 6 - meta: */ { x-arangodb: true } // [[string]->[string]] http的header + ] + Body (binary data) + + 如果数据库未设置(entry is `null`),则数据库为“ _system”。 + +#### type: + 1 = Request + 2 = Response (final response for this message id) + 3 = Response (but at least one more response will follow) + 1000 = Authentication +#### requestType: + 0 = DELETE + 1 = GET + 2 = POST + 3 = PUT + 4 = HEAD (not used in VPP) + 5 = PATCH + 6 = OPTIONS (not used in VPP) + +### For example: + HTTP请求 + http://localhost:8529/_db/test/_admin/echo?a=1&b=2&c[]=1&c[]=3 + With header:: + X-ArangoDB-Async: true + is equivalent to + [ + 1, // version + 1, // type + "test", // database + 1, // requestType GET + "/_admin/echo", // request path + { // parameters + a: 1, + b: 2, + c: [ 1, 3 ] + }, + { // meta + x-arangodb-async: true + } + ] + + 该请求是一条以VelocyPack开头的消息。这个VelocyPack始终包含标题字段,参数和请求路径。 + 如果meta字段不包含内容类型,则采用默认值 "application/vpack",并且正文将是一个或多个VelocyPack对象。 + +#### The response will be + [ + 1, // 0 - version + 2 or 3, // 1 - type + 400, // 2 - responseCode + { etag: "1234" } // 3 - meta: [[str]->[str]] + ] + +#### Body (binary data) + 请求可以通过管道传输或混合。使用标头中的“ messageId”映射响应。发送者有责任生成合适的“ messageId”值。 + + 默认内容类型为"application/vpack"。 + +### Authentication + A connection can be authenticated with the following message: + [ + 1, // version + 1000, // type + "plain", // encryption + "admin", // user + "plaintext", // password + ] + or + [ + 1, // version + 1000, // type + "jwt", // encryption + "abcd..." // token + ] + The response is + { "error": false } + + if successful or + { + "error": true, + "errorMessage": "MESSAGE", + "errorCode": CODE + } + + 如果不成功,则在这种情况下服务器将关闭连接。可以使用/_open/auth与HTTP版本相同的语义,使用未经身份验证的开放式路由,以与HTTP相同的方式获取JWT令牌。这样,可以通过JWT在单个会话中完成完整的身份验证。 + +### Content-Type and Accept + 通常,内容类型将是VPP,即主体是存储为VelocyPack的对象。 + 有时有必要使用非结构化数据(例如文本,css或html)进行响应。主体将是一个仅包含二进制属性的VelocyPack对象,并将相应地设置content-type。 + +规则如下。 + +#### Http + Request: Content-Type + "application/json"`: the body contains the JSON string representation + `"application/vpack"`: the body contains a velocy pack + + 有一些处理程序允许JSON列表(由换行符分隔)。在这种情况下,我们还允许不使用任何分隔符的多个速度包。 + + Request: Accept + "application/json":如果可能,在正文中发送JSON字符串表示形式 + "application/vpack":如果可能的话,在体内发送速度包 + + 如果请求是"application/json"或"application/vpack"处理程序产生了其他请求(即"application/html"),那么将忽略接受。 + + 如果请求被请求"application/json"并且处理程序产生 "application/vpack",则VPACK将转换为JSON。 + + 如果请求被请求"application/vpack",并且处理程序生成“ application / json”,则JSON将转换为VPACK。 + +#### VPP + 与HTTP相似,不同之处在于:不支持“ Accept”标头,并且"application/json"始终将其转换为“ application / vpack”。这意味着主体包含一个或多个速度包。通常,它将包含一个-值得注意的例外是导入。 + + 如果处理程序产生了其他东西(即"application/html"),则主体将是一个二进制blob(而不是一个velocy-pack),并且将相应地设置content-type。 + + 连接后发送的第一个字节(“客户端”端-即使程序是双向的,也有服务器在监听端口,而客户端在连接端口) + + VST/1.1\r\n\r\n +(11 Bytes) \ No newline at end of file diff --git a/rebar.config b/rebar.config index de134f1..ad795f0 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {erl_opts, [{i, "include"}]}. {edoc_opts, [{preprocess, true}]}. {deps, [ + {eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}}, {jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.5"}}} %% {jsx, {git, "https://github.com/talentdeficit/jsx.git", {tag, "v3.0.0"}}} ]}. diff --git a/src/agHttpCli/agVstAgencyExm.erl b/src/agHttpCli/agVstAgencyExm.erl new file mode 100644 index 0000000..40bff83 --- /dev/null +++ b/src/agHttpCli/agVstAgencyExm.erl @@ -0,0 +1,77 @@ +-module(agVstAgencyExm). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + start_link/3 + , init_it/3 + , system_code_change/4 + , system_continue/3 + , system_get_state/1 + , system_terminate/4 +]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}. +start_link(ServerName, Args, SpawnOpts) -> + proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts). + +init_it(ServerName, Parent, Args) -> + case safeRegister(ServerName) of + true -> + process_flag(trap_exit, true), + moduleInit(Parent, Args); + {false, Pid} -> + proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}}) + end. + +-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}. +system_code_change(MiscState, _Module, _OldVsn, _Extra) -> + {ok, MiscState}. + +-spec system_continue(pid(), [], {module(), term(), term()}) -> ok. +system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) -> + loop(Parent, SrvState, CliState). + +-spec system_get_state(term()) -> {ok, term()}. +system_get_state({_Parent, SrvState, _CliState}) -> + {ok, SrvState}. + +-spec system_terminate(term(), pid(), [], term()) -> none(). +system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) -> + terminate(Reason, SrvState, CliState). + +safeRegister(ServerName) -> + try register(ServerName, self()) of + true -> true + catch + _:_ -> {false, whereis(ServerName)} + end. + +moduleInit(Parent, Args) -> + case agTcpAgencyIns:init(Args) of + {ok, SrvState, CliState} -> + proc_lib:init_ack(Parent, {ok, self()}), + loop(Parent, SrvState, CliState); + {stop, Reason} -> + proc_lib:init_ack(Parent, {error, Reason}), + exit(Reason) + end. + +loop(Parent, SrvState, CliState) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState}); + {'EXIT', Parent, Reason} -> + terminate(Reason, SrvState, CliState); + Msg -> + {ok, NewSrvState, NewCliState} = agVstAgencyIns:handleMsg(Msg, SrvState, CliState), + loop(Parent, NewSrvState, NewCliState) + end. + +terminate(Reason, SrvState, CliState) -> + agTcpAgencyIns:terminate(Reason, SrvState, CliState), + exit(Reason). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/src/agHttpCli/agVstAgencyIns.erl b/src/agHttpCli/agVstAgencyIns.erl new file mode 100644 index 0000000..2df0274 --- /dev/null +++ b/src/agHttpCli/agVstAgencyIns.erl @@ -0,0 +1,400 @@ +-module(agVstAgencyIns). +-include("agHttpCli.hrl"). +-include("erlArango.hrl"). + +-compile(inline). +-compile({inline_size, 128}). + +-export([ + %% Inner Behavior API + init/1 + , handleMsg/3 + , terminate/3 +]). + +-spec init(term()) -> no_return(). +init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) -> + ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max), + self() ! ?miDoNetConnect, + {ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}. + +-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}. +handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) -> + case Socket of + undefined -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}), + {ok, SrvState, CliState}; + _ -> + case BacklogNum >= BacklogSize of + true -> + ?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}), + {ok, SrvState, CliState}; + _ -> + case Status of + leisure -> %% 空闲模式 + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + case gen_tcp:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}), + agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}}) + end; + _ -> + {ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}} + end + end + end; +handleMsg({tcp, Socket, Data}, + #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> + agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + {ok, NewRecvState} -> + {ok, SrvState, CliState#cliState{recvState = NewRecvState}}; + {error, Reason} -> + ?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}}) + catch + E:R:S -> + ?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError}) + end; +handleMsg({timeout, TimerRef, waiting_over}, + #srvState{socket = Socket} = SrvState, + #cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) -> + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + %% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱 + gen_tcp:close(Socket), + handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); +handleMsg({tcp_closed, Socket}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + ?WARN(ServerName, "connection closed~n", []), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); +handleMsg({tcp_error, Socket, Reason}, + #srvState{socket = Socket, serverName = ServerName} = SrvState, + CliState) -> + ?WARN(ServerName, "connection error: ~p~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); +handleMsg(?miDoNetConnect, + #srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState, + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, Socket} -> + NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState), + %% 新建连接之后 需要重置之前的buff之类状态数据 + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}) + end; + {error, _Reason} -> + agAgencyUtils:reconnectTimer(SrvState, CliState) + end; + _Ret -> + ?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret]) + end; +handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> + ?WARN(ServerName, "unknown msg: ~p~n", [Msg]), + {ok, SrvState, CliState}. + +-spec terminate(term(), srvState(), cliState()) -> ok. +terminate(_Reason, + #srvState{socket = Socket} = SrvState, + CliState) -> + {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), + ok. + +-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}. +overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) -> + case Status of + leisure -> + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs}) + end; + _ -> + overReceiveTcpData(SrvState, CliState) + end. + +-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. +overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> + case erlang:monotonic_time(millisecond) > OverTime of + true -> + %% 超时了 + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + [MiRequest] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + [MiRequest] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); + [MiRequest | Outs] -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + _ -> + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + case gen_tcp:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}); + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), + agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) + end + end. + +-spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}. +overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) -> + receive + {tcp, Socket, Data} -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} -> + agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}), + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + {ok, NewRecvState} -> + overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState}); + {error, Reason} -> + ?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}}) + catch + E:R:S -> + ?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}}) + end; + {timeout, TimerRef, waiting_over} -> + case CurInfo of + {_PidForm, _RequestId, TimerRef} -> + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(CurInfo, {error, timeout}), + + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}}; + [MiRequest] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end; + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end + end; + [MiRequest] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end; + [MiRequest | Outs] -> + case ?agBeamPool:getv(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined}); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}}) + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}}) + end + end; + _ -> + ?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]), + overReceiveTcpData(SrvState, CliState) + end; + {tcp_closed, Socket} -> + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); + {tcp_error, Socket, Reason} -> + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}); + #miRequest{} = MiRequest -> + overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}); + _Msg -> + ?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]), + overReceiveTcpData(SrvState, CliState) + end. + +-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. +dealConnect(ServerName, HostName, Port, SocketOptions) -> + case inet:getaddrs(HostName, inet) of + {ok, IPList} -> + Ip = agMiscUtils:randomElement(IPList), + case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of + {ok, Socket} -> + {ok, Socket}; + {error, Reason} -> + ?WARN(ServerName, "connect error: ~p~n", [Reason]), + {error, Reason} + end; + {error, Reason} -> + ?WARN(ServerName, "getaddrs error: ~p~n", [Reason]), + {error, Reason} + end. + +-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. +dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, + #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) -> + case erlang:monotonic_time(millisecond) > OverTime of + true -> + %% 超时了 + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + case RequestsOuts of + [] -> + case RequestsIns of + [] -> + {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1}); + MiRLists -> + [MiRequest | Outs] = lists:reverse(MiRLists), + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + [MiRequest] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1}); + [MiRequest | Outs] -> + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1}) + end; + _ -> + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + case gen_tcp:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}}; + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}), + agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError}) + end + end. + diff --git a/src/agHttpCli/vst.erl b/src/agHttpCli/vst.erl new file mode 100644 index 0000000..3361334 --- /dev/null +++ b/src/agHttpCli/vst.erl @@ -0,0 +1,438 @@ +-module(vst). + +-export([ + authorize/1, + connect/2, + request/2, + vst_maxsize/0 +]). + +assign_map(_acc@1, _key@1, _value@1) -> + case _acc@1 of + #{_key@1 := _} -> _acc@1; + #{} -> _acc@1#{_key@1 => _value@1}; + _ -> #{_key@1 => _value@1} + end. + +assign_split([<<>>, _rest@1], _value@1, _acc@1, _pattern@1) -> + _parts@1 = binary:split(_rest@1, _pattern@1), + case _acc@1 of + [_ | _] -> + [assign_split(_parts@1, _value@1, none, _pattern@1) | _acc@1]; + none -> + [assign_split(_parts@1, _value@1, none, _pattern@1)]; + _ -> _acc@1 + end; +assign_split([_key@1, _rest@1], _value@1, _acc@1, _pattern@1) -> + _parts@1 = binary:split(_rest@1, _pattern@1), + case _acc@1 of + #{_key@1 := _current@1} -> + _acc@1#{_key@1 => + assign_split(_parts@1, _value@1, _current@1, _pattern@1)}; + #{} -> + _acc@1#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)}; + _ -> + #{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)} + end; +assign_split([<<>>], nil, _acc@1, __pattern@1) -> + case _acc@1 of + [_ | _] -> _acc@1; + _ -> [] + end; +assign_split([<<>>], _value@1, _acc@1, __pattern@1) -> + case _acc@1 of + [_ | _] -> [_value@1 | _acc@1]; + none -> [_value@1]; + _ -> _acc@1 + end; +assign_split([_key@1], _value@1, _acc@1, __pattern@1) -> + assign_map(_acc@1, _key@1, _value@1). + +authorize(#{socket := _socket@1, username := _un@1, password := _pw@1} = _state@1) -> + case eVPack:encode([1, 1000, <<"plain">>, _un@1, _pw@1]) of + {ok, _auth@1} -> + case send_stream(_socket@1, build_stream(_auth@1)) of + ok -> + case recv_header(_socket@1) of + {ok, _header@1} -> + case recv_stream(_socket@1, _header@1) of + {ok, _stream@1} -> + case decode_stream(_stream@1) of + {ok, [[1, 2, 200, __headers@2] | __body@1]} -> + ok; + _@1 -> + case _@1 of + {ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} -> + {error, #{ + '__exception__' => true, + error_num => nil, + status => _status@1, + message => proplists:get_value(<<"errorMessage">>, _body@1), + endpoint => + case _state@1 of + #{endpoint := _@3} -> + _@3; + _@3 when erlang:is_map(_@3) -> + erlang:error({badkey, endpoint, _@3}); + _@3 -> + _@3:endpoint() + end}}; + {error, _reason@1} -> + {error, _reason@1}; + _@2 -> + erlang:error({with_clause, _@2}) + end + end; + _@1 -> + case _@1 of + {ok, + [[1, 2, _status@1, __headers@1], _body@1 | _]} -> + {error, #{ + '__exception__' => true, + error_num => nil, + status => _status@1, + message => proplists:get_value(<<"errorMessage">>, _body@1), + endpoint => + case _state@1 of + #{endpoint := _@3} -> + _@3; + _@3 when erlang:is_map(_@3) -> + erlang:error({badkey, endpoint, _@3}); + _@3 -> _@3:endpoint() + end}}; + {error, _reason@1} -> + {error, _reason@1}; + _@2 -> erlang:error({with_clause, _@2}) + end + end; + _@1 -> + case _@1 of + {ok, + [[1, 2, _status@1, __headers@1], _body@1 + | _]} -> + {error, #{ + '__exception__' => true, + error_num => nil, status => _status@1, + message => proplists:get_value(<<"errorMessage">>, _body@1), + endpoint => + case _state@1 of + #{endpoint := _@3} -> _@3; + _@3 when erlang:is_map(_@3) -> + erlang:error({badkey, + endpoint, + _@3}); + _@3 -> _@3:endpoint() + end}}; + {error, _reason@1} -> {error, _reason@1}; + _@2 -> erlang:error({with_clause, _@2}) + end + end; + _@1 -> + case _@1 of + {ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} -> + {error, + #{ + '__exception__' => true, error_num => nil, + status => _status@1, + message => proplists:get_value(<<"errorMessage">>, _body@1), + endpoint => + case _state@1 of + #{endpoint := _@3} -> _@3; + _@3 when erlang:is_map(_@3) -> + erlang:error({badkey, + endpoint, + _@3}); + _@3 -> _@3:endpoint() + end}}; + {error, _reason@1} -> {error, _reason@1}; + _@2 -> erlang:error({with_clause, _@2}) + end + end; + _@1 -> + case _@1 of + {ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} -> + {error, #{ + '__exception__' => true, + error_num => nil, + status => _status@1, + message => proplists:get_value(<<"errorMessage">>, _body@1), + endpoint => + case _state@1 of + #{endpoint := _@3} -> _@3; + _@3 when erlang:is_map(_@3) -> + erlang:error({badkey, endpoint, _@3}); + _@3 -> _@3:endpoint() + end}}; + {error, _reason@1} -> {error, _reason@1}; + _@2 -> erlang:error({with_clause, _@2}) + end + end. + +body_for(<<>>) -> {ok, <<>>}; +body_for(_body@1) -> + eVPack:encode(_body@1). + +body_from([]) -> nil; +body_from([_body@1]) -> _body@1; +body_from(_body@1) -> _body@1. + +build_stream(_message@1) -> + case chunk_every(_message@1, 30696) of + [_first_chunk@1 | _rest_chunks@1] -> + _n_chunks@1 = erlang:length([_first_chunk@1 + | _rest_chunks@1]), + _msg_length@1 = erlang:byte_size(_message@1) + + _n_chunks@1 * 24, + _rest_chunks@2 = + lists:reverse(lists:flodl( + fun(_n@1, _@1) -> + case _rest_chunks@1 /= [] of + true -> + [prepend_chunk(lists:nth(_n@1, _rest_chunks@1), _n@1, 0, 0, _msg_length@1) | _@1]; + false -> _@1 + end + end, [], lists:seq(1, erlang:length(_rest_chunks@1)))), + [prepend_chunk(_first_chunk@1, _n_chunks@1, 1, 0, _msg_length@1) | _rest_chunks@2]; + _only_chunk@1 -> + prepend_chunk(_only_chunk@1, 1, 1, 0, erlang:byte_size(_message@1) + 24) + end. + +chunk_every(_bytes@1, _size@1) when erlang:byte_size(_bytes@1) =< _size@1 -> + _bytes@1; +chunk_every(_bytes@1, _size@1) -> + <<_chunk@1:_size@1/binary, _rest@1/binary>> = _bytes@1, + [_chunk@1 | (chunk_every(_rest@1, _size@1))]. + +connect(#{addr := _addr@1, 'ssl?' := _ssl}, _opts@1) -> + _mod@1 = case _ssl of + _@1 when _@1 =:= false orelse _@1 =:= nil -> gen_tcp; + _ -> ssl + end, + _transport_opts@1 = case _ssl of + _@2 when _@2 =:= false orelse _@2 =:= nil -> + tcp_opts; + _ -> ssl_opts + end, + _transport_opts@2 = proplists:get_value(_transport_opts@1, _opts@1, []), + _connect_timeout@1 = proplists:get(connect_timeout, _opts@1, 5000), + _options@1 = lists:merge(_transport_opts@2, [{packet, raw}, {mode, binary}, {active, false}]), + case _mod@1:connect(addr_for(_addr@1), port_for(_addr@1), _options@1, _connect_timeout@1) of + {ok, _port@1} -> + case _mod@1:send(_port@1, <<"VST/1.1\r\n\r\n">>) of + ok -> {ok, {_mod@1, _port@1}}; + _@3 -> _@3 + end; + _@3 -> _@3 + end. + +decode_pair({_key@1, _value@1}, _acc@1) -> + case case _key@1 /= <<>> of + false -> false; + true -> binary:last(_key@1) == 93 + end + of + false -> assign_map(_acc@1, _key@1, _value@1); + true -> + _subkey@1 = binary:part(_key@1, + 0, + erlang:byte_size(_key@1) - 1), + assign_split(binary:split(_subkey@1, <<"[">>), + _value@1, + _acc@1, + binary:compile_pattern(<<"][">>)) + end. + +decode_stream(_@1) -> decode_stream(_@1, []). + +decode_stream(<<>>, _acc@1) -> {ok, _acc@1}; +decode_stream(_stream@1, _acc@1) -> + case eVPack:decode(_stream@1) of + {ok, {_term@1, _rest@1}} -> + decode_stream(_rest@1, erlang:'++'(_acc@1, [_term@1])); + {ok, _term@2} -> {ok, erlang:'++'(_acc@1, [_term@2])}; + {error, _reason@1} -> {error, _reason@1} + end. + +headers_for(#{} = _headers@1) -> _headers@1; +headers_for(_headers@1) + when erlang:is_list(_headers@1) -> + maps:from_list(_headers@1). + +method_for(delete) -> 0; +method_for(get) -> 1; +method_for(post) -> 2; +method_for(put) -> 3; +method_for(head) -> 4; +method_for(patch) -> 5; +method_for(options) -> 6; +method_for(_) -> -1. + +port_for({unix, __path@1}) -> 0; +port_for({tcp, __host@1, _port@1}) -> _port@1. + +prepend_chunk(_chunk@1, _chunk_n@1, _is_first@1, + _msg_id@1, _msg_length@1) -> + <<(24 + erlang:byte_size(_chunk@1)):32/integer-little, + (binary:decode_unsigned(<<_chunk_n@1:31/integer, + _is_first@1:1/integer>>, + little)):32/integer, + _msg_id@1:64/integer-little, + _msg_length@1:64/integer-little, _chunk@1/binary>>. + +query_for(nil) -> #{}. + +recv_chunk({_mod@1, _port@1}, _chunk_length@1) -> + _mod@1:recv(_port@1, _chunk_length@1 - 24). + +recv_header({_mod@1, _port@1}) -> + case _mod@1:recv(_port@1, 24) of + {ok, + <<_chunk_length@1:32/integer-little, + _chunk_x@1:32/integer, _msg_id@1:64/integer-little, + _msg_length@1:64/integer-little>>} -> + <<_chunk_n@1:31/integer, _is_first@1:1/integer>> = + <<_chunk_x@1:32/integer-little>>, + {ok, + [_chunk_length@1, + _chunk_n@1, + _is_first@1, + _msg_id@1, + _msg_length@1]}; + {error, _reason@1} -> {error, _reason@1} + end. + +recv_stream(_socket@1, + [_chunk_length@1, 1, 1, __msg_id@1, __msg_length@1]) -> + recv_chunk(_socket@1, _chunk_length@1); +recv_stream(_socket@1, + [_chunk_length@1, + _n_chunks@1, + 1, + __msg_id@1, + __msg_length@1]) -> + case recv_chunk(_socket@1, _chunk_length@1) of + {ok, _buffer@1} -> + case recv_stream(_socket@1, _n_chunks@1, _buffer@1) of + {ok, _stream@1} -> {ok, _stream@1}; + _@1 -> _@1 + end; + _@1 -> _@1 + end. + +recv_stream(_socket@1, _n_chunks@1, _buffer@1) -> + lists:reduce_while(lists:seq(1, _n_chunks@1 - 1), + _buffer@1, + fun(_n@1, _buffer@2) -> + case recv_header(_socket@1) of + {ok, [_chunk_length@1, _, _, _, _]} -> + case recv_chunk(_socket@1, _chunk_length@1) of + {ok, _chunk@1} -> + case _n@1 == _n_chunks@1 - 1 of + false -> + {cont, <<_buffer@2/binary, _chunk@1/binary>>}; + true -> + {halt, {ok, <<_buffer@2/binary, _chunk@1/binary>>}} + end; + _@1 -> + case _@1 of + {error, _reason@1} -> + {halt, {error, _reason@1}}; + _@2 -> + erlang:error({with_clause, _@2}) + end + end; + _@1 -> + case _@1 of + {error, _reason@1} -> + {halt, {error, _reason@1}}; + _@2 -> + erlang:error({with_clause, _@2}) + end + end + end). + +request(#{method := _method@1, path := _path@1, headers := _headers@1, body := _body@1}, #{socket := _socket@1, database := _database@1} = _state@1) -> + #{path := _path@2, query := _query@1} = http_uri:parse(_path@1), + {_database@3, _path@4} = + case _path@2 of + <<"/_db/", _rest@1/binary>> -> + [_database@2, _path@3] = binary:split(_rest@1, <<"/">>), + {_database@2, <<"/", _path@3/binary>>}; + _ -> + {case _database@1 of + _@1 when _@1 =:= false orelse _@1 =:= nil -> + <<>>; + _@2 -> _@2 + end, _path@2} + end, + _request@1 = [1, 1, _database@3, method_for(_method@1), _path@4, query_for(_query@1), headers_for(_headers@1)], + case eVPack:encode(_request@1) of + {ok, _request@2} -> + case body_for(_body@1) of + {ok, _body@2} -> + case send_stream(_socket@1, build_stream(<<_request@2/binary, _body@2/binary>>)) + of + ok -> + case recv_header(_socket@1) of + {ok, _header@1} -> + case recv_stream(_socket@1, _header@1) of + {ok, _stream@1} -> + case decode_stream(_stream@1) of + {ok, [[1, 2, _status@1, _headers@2] | _body@3]} -> + {ok, #{status => _status@1, headers => _headers@2, body => body_from(_body@3)}, _state@1}; + _@3 -> + case _@3 of + {error, closed} -> + {error, noproc, _state@1}; + {error, _reason@1} -> + {error, _reason@1, _state@1}; + _@4 -> + erlang:error({with_clause, _@4}) + end + end; + _@3 -> + case _@3 of + {error, closed} -> + {error, noproc, _state@1}; + {error, _reason@1} -> + {error, _reason@1, _state@1}; + _@4 -> + erlang:error({with_clause, _@4}) + end + end; + _@3 -> + case _@3 of + {error, closed} -> + {error, noproc, _state@1}; + {error, _reason@1} -> + {error, _reason@1, _state@1}; + _@4 -> erlang:error({with_clause, _@4}) + end + end; + _@3 -> + case _@3 of + {error, closed} -> {error, noproc, _state@1}; + {error, _reason@1} -> + {error, _reason@1, _state@1}; + _@4 -> erlang:error({with_clause, _@4}) + end + end; + _@3 -> + case _@3 of + {error, closed} -> {error, noproc, _state@1}; + {error, _reason@1} -> {error, _reason@1, _state@1}; + _@4 -> erlang:error({with_clause, _@4}) + end + end; + _@3 -> + case _@3 of + {error, closed} -> {error, noproc, _state@1}; + {error, _reason@1} -> {error, _reason@1, _state@1}; + _@4 -> erlang:error({with_clause, _@4}) + end + end. + +send_stream(Socket, IoData) -> + tcp:send(Socket, IoData). + +vst_maxsize() -> 30720.