Procházet zdrojové kódy

vst相关还原

master
SisMaker před 4 roky
rodič
revize
95b6ba4e41
6 změnil soubory, kde provedl 1 přidání a 1391 odebrání
  1. +0
    -286
      VelocyStream.md
  2. +0
    -188
      VelocyStream_zh.md
  3. +1
    -2
      rebar.config
  4. +0
    -77
      src/agHttpCli/agVstAgencyExm.erl
  5. +0
    -400
      src/agHttpCli/agVstAgencyIns.erl
  6. +0
    -438
      src/agHttpCli/vst.erl

+ 0
- 286
VelocyStream.md Zobrazit soubor

@ -1,286 +0,0 @@
Client / Server Communication (VST 1.1)
=======================================
Version 1.1.0 of 23 November 2016
HTTP
----
Use VelocyPack as body. Content-Type is `"application/vpack"`
Binary Protocol
---------------
This is not a request / response protocol. It is symmetric (in
principle). Messages can be sent back and forth, pipelined, multiplexed,
uni-directional or bi-directional.
It is possible that a message generates
- no response
- exactly one response
- multiple responses
The VelocyStream does **not** impose or specify or require one of the
above behaviors. The application must define a behavior in general or on
a per-request basis, see below. The server and client must then
implement that behavior.
### Message vs. Chunk
The consumer (client or server) will deal with messages. A message
consists of one or more VelocyPacks (or in some cases of certain parts
of binary data). How many VelocyPacks are part of a message is
completely application dependent, see below for ArangoDB.
It is possible that the messages are very large. As messages can be
multiplexed over one connection, large messages need to be split into
chunks. The sender/receiver class will accept a vector of VelocyPacks,
split them into chunks, send these chunks over the wire, assemble these
chunks, generates a vector of VelocyPacks and passes this to the
consumer.
### Chunks
In order to allow reassemble chunks, each package is prefixed by a small
header. A chunk is always at least 24 bytes long. The byte order is
ALWAYS little endian. The format of a chunk is the following, regardless
on whether it is the first in a message or a subsequent one:
| name | type | description |
| --------------- | ------------------------- | --- |
| length | uint32\_t | total length in bytes of the current chunk, including this header |
| chunkX | uint32\_t | chunk/isFirstChunk (upper 31bits/lowest bit), details see below |
| messageId | uint64\_t | a unique identifier, it is the responsibility of the sender to generate such an identifier (zero is reserved for not set ID) |
| messageLength | uint64\_t | the total size of the message. |
| Binary data | binary data blob | size b1 |
Clarification: "chunk" and "isFirstChunk" are combined into an unsigned
32bit value. Therefore it will be encoded as
uint32_t chunkX
and extracted as
chunk = chunkX >> 1
isFirstChunk = chunkX & 0x1
For the first chunk of a message, the low bit of the second uint32\_t is
set, for all subsequent ones it is reset. In the first chunk of a
message, the number "chunk" is the total number of chunks in the
message, in all subsequent chunks, the number "chunk" is the current
number of this chunk.
The total size of the data package is (24 + b1) bytes. This number is
stored in the length field. If one needs to send messages larger than
UINT32\_MAX, then these messages must be chunked. In general it is a
good idea to restrict the maximal size to a few megabytes.
**Notes:**
When sending a (small) message, it is important (for performance reasons)
to ensure that only one TCP
packet is sent. For example, by using sendmmsg under Linux
([*https://blog.cloudflare.com/how-to-receive-a-million-packets/*](https://blog.cloudflare.com/how-to-receive-a-million-packets/))
Implementors should nevertheless be aware that in TCP/IP one cannot
enforce this and so implementations must always be aware that some part
of the network stack can split packets and the payload might arrive in
multiple parts!
ArangoDB
========
### Request / Response
For an ArangoDB client, the request is of the following format, the
array is a VelocyPack array:
[
/* 0 - version: */ 1, // [int]
/* 1 - type: */ 1, // [int] 1=Req, 2=Res,..
/* 2 - database: */ "test", // [string]
/* 3 - requestType: */ 1, // [int] 0=Delete, ...
/* 4 - request: */ "/_api/collection", // [string\]
/* 5 - parameter: */ { force: true }, // [[string]->[string]]
/* 6 - meta: */ { x-arangodb: true } // [[string]->[string]]
]
Body (binary data)
If database is missing (entry is `null`), then "\_system" is assumed.
`type`:
1 = Request
2 = Response (final response for this message id)
3 = Response (but at least one more response will follow)
1000 = Authentication
`requestType`:
0 = DELETE
1 = GET
2 = POST
3 = PUT
4 = HEAD (not used in VPP)
5 = PATCH
6 = OPTIONS (not used in VPP)
For example:
The HTTP request
http://localhost:8529/_db/test/_admin/echo?a=1&b=2&c[]=1&c[]=3
With header:
X-ArangoDB-Async: true
is equivalent to
[
1, // version
1, // type
"test", // database
1, // requestType GET
"/_admin/echo", // request path
{ // parameters
a: 1,
b: 2,
c: [ 1, 3 ]
},
{ // meta
x-arangodb-async: true
}
]
The request is a message beginning with one VelocyPack. This VelocyPack
always contains the header fields, parameters and request path. If the
meta field does not contain a content type, then the default
`"application/vpack"` is assumed and the body will be one or multiple
VelocyPack object.
The response will be
[
1, // 0 - version
2 or 3, // 1 - type
400, // 2 - responseCode
{ etag: "1234" } // 3 - meta: [[str]->[str]]
]
Body (binary data)
Request can be pipelined or mixed. The responses are mapped using the
"messageId" in the header. It is the responsibility of the **sender** to
generate suitable "messageId" values.
The default content-type is `"application/vpack"`.
### Authentication
A connection can be authenticated with the following message:
[
1, // version
1000, // type
"plain", // encryption
"admin", // user
"plaintext", // password
]
or
[
1, // version
1000, // type
"jwt", // encryption
"abcd..." // token
]
The response is
{ "error": false }
if successful or
{
"error": true,
"errorMessage": "MESSAGE",
"errorCode": CODE
}
if not successful, and in this case the connection is closed by the server.
One can acquire a JWT token in the same way as with HTTP using the
open, unauthenticated route `/_open/auth` with the same semantics as
in the HTTP version. In this way, the complete authentication can be
done in a single session via JWT.
### Content-Type and Accept
In general the content-type will be VPP, that is the body is an object
stored as VelocyPack.
Sometimes it is necessary to respond with unstructured data, like text,
css or html. The body will be a VelocyPack object containing just a
binary attribute and the content-type will be set accordingly.
The rules are as follows.
#### Http
Request: Content-Type
- `"application/json"`: the body contains the JSON string representation
- `"application/vpack"`: the body contains a velocy pack
There are some handler that allow lists of JSON (seperared by newline).
In this case we also allow multiple velocy packs without any separator.
Request: Accept
- `"application/json"`: send a JSON string representation in the body,
if possible
- `"application/vpack"`: send velocy pack in the body, if possible
If the request asked for `"application/json"` or `"application/vpack"` and
the handler produces something else (i.e. `"application/html"`), then the
accept is ignored.
If the request asked `"application/json"` and the handler produces
`"application/vpack"`, then the VPACK is converted into JSON.
If the request asked `"application/vpack"` and the handler produces
"application/json", then the JSON is converted into VPACK.
#### VPP
Similar to HTTP with the exception: the "Accept" header is not supported
and `"application/json"` will always be converted into
"application/vpack". This means that the body contains one or more
velocy-packs. In general it will contain one - notable exception being
the import.
If the handler produces something else (i.e. `"application/html"`), then
The body will be a binary blob (instead of a velocy-pack) and the
content-type will be set accordingly.
The first bytes sent after a connection (the "client" side - even if the
program is bi-directional, there is a server listening to a port and a
client connecting to a port) are
VST/1.1\r\n\r\n
(11 Bytes)

+ 0
- 188
VelocyStream_zh.md Zobrazit soubor

@ -1,188 +0,0 @@
Client / Server Communication (VST 1.1)
=======================================
Version 1.1.0 of 23 November 2016
# HTTP
使用VelocyPack作为身体。内容类型为"application/vpack"
Binary Protocol
---------------
这不是请求/响应协议。它是对称的(原则上)。消息可以来回发送,流水线,多路复用,单向或双向。
可能会生成一条消息
没有反应
只是一个回应
多重回应
该VelocyStream并没有强加或指定或要求上述行为之一。
应用程序必须在一般情况下或根据每个请求定义行为,
请参见下文。然后,服务器和客户端必须实现该行为。
### Message vs. Chunk
使用者(客户端或服务器)将处理消息。一条消息包含一个或多个VelocyPack(在某些情况下是二进制数据的某些部分)。
消息中有多少VelocyPacks完全取决于应用程序,请参阅下文了解ArangoDB。
消息可能很大。由于可以通过一个连接对消息进行多路复用,因此需要将大消息拆分为多个块。
发送者/接收者类将接受一个VelocyPacks向量,将其分割成块,通过导线发送这些块,组装这些块,
生成一个VelocyPacks向量并将其传递给消费者。
### Chunks
为了允许重组块,每个程序包都以一个 header 作为前缀。块始终至少为24个字节长。字节顺序始终是小端。
块的格式如下,无论它是消息中的第一个消息还是随后的消息:
名称 类型 描述
length uint32_t 当前块(包括此标头)的总长度(以字节为单位)
chunkX uint32_t chunk / isFirstChunk(高31位/最低位),详细信息请参见下文
messageId uint64_t 唯一标识符,发送方有责任生成这样的标识符(zero is reserved for not set ID)
messageLength uint64_t the total size of the message.
Binary data binary data blob size b1 |
声明:"chunk" and "isFirstChunk" are combined into an unsigned 32bit value.
Therefore it will be encoded as
uint32_t chunkX and extracted as
chunk = chunkX >> 1
isFirstChunk = chunkX & 0x1
对于消息的第一块,设置第二个uint32_t的低位,对于所有后续消息,将其复位。
在消息的第一个块中,数字 chunk 是消息中的块总数,在所有后续块中,数字 chunk 是该块的当前number。
数据包的总大小为(24 + b1)字节。此数字存储在length字段中。
如果需要发送大于UINT32_MAX的消息,则必须将这些消息分块。通常,最好将最大大小限制为几兆字节。
### **Notes:**
发送(小)消息时,(出于性能原因)确保仅发送一个TCP数据包非常重要。
例如,通过在Linux下使用sendmmsg
([https://blog.cloudflare.com/how-to-receive-a-million-packets](https://blog.cloudflare.com/how-to-receive-a-million-packets/))
但是,实现者应该意识到,在TCP / IP中不能强制执行此操作,因此实现者必须始终意识到,
网络堆栈的某些部分可以拆分数据包,并且有效负载可能分成多个部分!
ArangoDB
========
### Request / Response
对于ArangoDB客户端,请求的格式如下,该数组是VelocyPack数组:
[
/* 0 - version: */ 1, // [int]
/* 1 - type: */ 1, // [int] 1=Req, 2=Res,..
/* 2 - database: */ "test", // [string]
/* 3 - requestType: */ 1, // [int] 0=Delete, ...
/* 4 - request: */ "/_api/collection", // [string\]
/* 5 - parameter: */ { force: true }, // [[string]->[string]] http的请求参数列表
/* 6 - meta: */ { x-arangodb: true } // [[string]->[string]] http的header
]
Body (binary data)
如果数据库未设置(entry is `null`),则数据库为“ _system”。
#### type:
1 = Request
2 = Response (final response for this message id)
3 = Response (but at least one more response will follow)
1000 = Authentication
#### requestType:
0 = DELETE
1 = GET
2 = POST
3 = PUT
4 = HEAD (not used in VPP)
5 = PATCH
6 = OPTIONS (not used in VPP)
### For example:
HTTP请求
http://localhost:8529/_db/test/_admin/echo?a=1&b=2&c[]=1&c[]=3
With header::
X-ArangoDB-Async: true
is equivalent to
[
1, // version
1, // type
"test", // database
1, // requestType GET
"/_admin/echo", // request path
{ // parameters
a: 1,
b: 2,
c: [ 1, 3 ]
},
{ // meta
x-arangodb-async: true
}
]
该请求是一条以VelocyPack开头的消息。这个VelocyPack始终包含标题字段,参数和请求路径。
如果meta字段不包含内容类型,则采用默认值 "application/vpack",并且正文将是一个或多个VelocyPack对象。
#### The response will be
[
1, // 0 - version
2 or 3, // 1 - type
400, // 2 - responseCode
{ etag: "1234" } // 3 - meta: [[str]->[str]]
]
#### Body (binary data)
请求可以通过管道传输或混合。使用标头中的“ messageId”映射响应。发送者有责任生成合适的“ messageId”值。
默认内容类型为"application/vpack"。
### Authentication
A connection can be authenticated with the following message:
[
1, // version
1000, // type
"plain", // encryption
"admin", // user
"plaintext", // password
]
or
[
1, // version
1000, // type
"jwt", // encryption
"abcd..." // token
]
The response is
{ "error": false }
if successful or
{
"error": true,
"errorMessage": "MESSAGE",
"errorCode": CODE
}
如果不成功,则在这种情况下服务器将关闭连接。可以使用/_open/auth与HTTP版本相同的语义,使用未经身份验证的开放式路由,以与HTTP相同的方式获取JWT令牌。这样,可以通过JWT在单个会话中完成完整的身份验证。
### Content-Type and Accept
通常,内容类型将是VPP,即主体是存储为VelocyPack的对象。
有时有必要使用非结构化数据(例如文本,css或html)进行响应。主体将是一个仅包含二进制属性的VelocyPack对象,并将相应地设置content-type。
规则如下。
#### Http
Request: Content-Type
"application/json"`: the body contains the JSON string representation
`"application/vpack"`: the body contains a velocy pack
有一些处理程序允许JSON列表(由换行符分隔)。在这种情况下,我们还允许不使用任何分隔符的多个速度包。
Request: Accept
"application/json":如果可能,在正文中发送JSON字符串表示形式
"application/vpack":如果可能的话,在体内发送速度包
如果请求是"application/json"或"application/vpack"处理程序产生了其他请求(即"application/html"),那么将忽略接受。
如果请求被请求"application/json"并且处理程序产生 "application/vpack",则VPACK将转换为JSON。
如果请求被请求"application/vpack",并且处理程序生成“ application / json”,则JSON将转换为VPACK。
#### VPP
与HTTP相似,不同之处在于:不支持“ Accept”标头,并且"application/json"始终将其转换为“ application / vpack”。这意味着主体包含一个或多个速度包。通常,它将包含一个-值得注意的例外是导入。
如果处理程序产生了其他东西(即"application/html"),则主体将是一个二进制blob(而不是一个velocy-pack),并且将相应地设置content-type。
连接后发送的第一个字节(“客户端”端-即使程序是双向的,也有服务器在监听端口,而客户端在连接端口)
VST/1.1\r\n\r\n
(11 Bytes)

+ 1
- 2
rebar.config Zobrazit soubor

@ -1,8 +1,7 @@
{erl_opts, [{i, "include"}]}.
{edoc_opts, [{preprocess, true}]}.
{deps, [
{eVPack, {git, "http://192.168.0.88:53000/SisMaker/eVPack.git", {branch, master}}},
{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.5"}}}
{jiffy, {git, "https://github.com/davisp/jiffy.git", {tag, "1.0.6"}}}
%% {jsx, {git, "https://github.com/talentdeficit/jsx.git", {tag, "v3.0.0"}}}
]}.

+ 0
- 77
src/agHttpCli/agVstAgencyExm.erl Zobrazit soubor

@ -1,77 +0,0 @@
-module(agVstAgencyExm).
-compile(inline).
-compile({inline_size, 128}).
-export([
start_link/3
, init_it/3
, system_code_change/4
, system_continue/3
, system_get_state/1
, system_terminate/4
]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm start %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec start_link(module(), term(), [proc_lib:spawn_option()]) -> {ok, pid()}.
start_link(ServerName, Args, SpawnOpts) ->
proc_lib:start_link(?MODULE, init_it, [ServerName, self(), Args], infinity, SpawnOpts).
init_it(ServerName, Parent, Args) ->
case safeRegister(ServerName) of
true ->
process_flag(trap_exit, true),
moduleInit(Parent, Args);
{false, Pid} ->
proc_lib:init_ack(Parent, {error, {alreadyStarted, Pid}})
end.
-spec system_code_change(term(), module(), undefined | term(), term()) -> {ok, term()}.
system_code_change(MiscState, _Module, _OldVsn, _Extra) ->
{ok, MiscState}.
-spec system_continue(pid(), [], {module(), term(), term()}) -> ok.
system_continue(_Parent, _Debug, {Parent, SrvState, CliState}) ->
loop(Parent, SrvState, CliState).
-spec system_get_state(term()) -> {ok, term()}.
system_get_state({_Parent, SrvState, _CliState}) ->
{ok, SrvState}.
-spec system_terminate(term(), pid(), [], term()) -> none().
system_terminate(Reason, _Parent, _Debug, {_Parent, SrvState, CliState}) ->
terminate(Reason, SrvState, CliState).
safeRegister(ServerName) ->
try register(ServerName, self()) of
true -> true
catch
_:_ -> {false, whereis(ServerName)}
end.
moduleInit(Parent, Args) ->
case agTcpAgencyIns:init(Args) of
{ok, SrvState, CliState} ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(Parent, SrvState, CliState);
{stop, Reason} ->
proc_lib:init_ack(Parent, {error, Reason}),
exit(Reason)
end.
loop(Parent, SrvState, CliState) ->
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {Parent, SrvState, CliState});
{'EXIT', Parent, Reason} ->
terminate(Reason, SrvState, CliState);
Msg ->
{ok, NewSrvState, NewCliState} = agVstAgencyIns:handleMsg(Msg, SrvState, CliState),
loop(Parent, NewSrvState, NewCliState)
end.
terminate(Reason, SrvState, CliState) ->
agTcpAgencyIns:terminate(Reason, SrvState, CliState),
exit(Reason).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% genExm end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

+ 0
- 400
src/agHttpCli/agVstAgencyIns.erl Zobrazit soubor

@ -1,400 +0,0 @@
-module(agVstAgencyIns).
-include("agHttpCli.hrl").
-include("erlArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
-export([
%% Inner Behavior API
init/1
, handleMsg/3
, terminate/3
]).
-spec init(term()) -> no_return().
init({PoolName, AgencyName, #agencyOpts{reconnect = Reconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}}) ->
ReconnectState = agAgencyUtils:initReconnectState(Reconnect, Min, Max),
self() ! ?miDoNetConnect,
{ok, #srvState{poolName = PoolName, serverName = AgencyName, rn = binary:compile_pattern(<<"\r\n">>), rnrn = binary:compile_pattern(<<"\r\n\r\n">>), reconnectState = ReconnectState}, #cliState{backlogSize = BacklogSize}}.
-spec handleMsg(term(), srvState(), cliState()) -> {ok, term(), term()}.
handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem} = MiRequest,
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, backlogSize = BacklogSize, requestsIns = RequestsIns, status = Status} = CliState) ->
case Socket of
undefined ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, noSocket}),
{ok, SrvState, CliState};
_ ->
case BacklogNum >= BacklogSize of
true ->
?WARN(ServerName, ":backlog full curNum:~p Total: ~p ~n", [BacklogNum, BacklogSize]),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, backlogFull}),
{ok, SrvState, CliState};
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, {socketSendError, Reason}}),
agAgencyUtils:dealClose(SrvState, CliState, {error, {socketSendError, Reason}})
end;
_ ->
{ok, SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1}}
end
end
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
{ok, SrvState, CliState#cliState{recvState = NewRecvState}};
{error, Reason} ->
?WARN(ServerName, "handle tcp data error: ~p ~p ~n", [Reason, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?WARN(ServerName, "handle tcp data crash: ~p:~p~n~p~n ~p ~n ", [E, R, S, CurInfo]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, agencyHandledataError})
end;
handleMsg({timeout, TimerRef, waiting_over},
#srvState{socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = {FromPid, RequestId, TimerRef}} = CliState) ->
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% tcp tcp收到该次超时数据
gen_tcp:close(Socket),
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
handleMsg(?miDoNetConnect,
#srvState{poolName = PoolName, serverName = ServerName, reconnectState = ReconnectState} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts} = CliState) ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{host = Host, port = Port, hostname = HostName, dbName = DbName, userPassword = UserPassword, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, Socket} ->
NewReconnectState = agAgencyUtils:resetReconnectState(ReconnectState),
%% buff之类状态数据
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState#srvState{userPassWord = UserPassword, dbName = DbName, host = Host, reconnectState = NewReconnectState, socket = Socket}, CliState#cliState{status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState#srvState{socket = Socket, reconnectState = NewReconnectState}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined})
end;
{error, _Reason} ->
agAgencyUtils:reconnectTimer(SrvState, CliState)
end;
_Ret ->
?WARN(ServerName, "deal connect not found agBeamPool:getv(~p) ret ~p is error ~n", [PoolName, _Ret])
end;
handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
?WARN(ServerName, "unknown msg: ~p~n", [Msg]),
{ok, SrvState, CliState}.
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{socket = Socket} = SrvState,
CliState) ->
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec overAllWork(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overAllWork(SrvState, #cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, status = Status} = CliState) ->
case Status of
leisure ->
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = []});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = []});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs})
end;
_ ->
overReceiveTcpData(SrvState, CliState)
end.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.
-spec overReceiveTcpData(srvState(), cliState()) -> {ok, srvState(), cliState()}.
overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsIns = RequestsIns, requestsOuts = RequestsOuts, recvState = RecvState} = CliState) ->
receive
{tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, {StatusCode, Body, Headers}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcpDataError, Reason}})
catch
E:R:S ->
?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledataError}})
end;
{timeout, TimerRef, waiting_over} ->
case CurInfo of
{_PidForm, _RequestId, TimerRef} ->
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}};
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsIns = [], requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
[MiRequest] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = [], status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end;
[MiRequest | Outs] ->
case ?agBeamPool:getv(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, CliState#cliState{requestsOuts = Outs, status = leisure, curInfo = undefined, recvState = undefined});
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {newTcpConnectErrorOver, _Reason}})
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {notFoundPoolName, PoolName}})
end
end;
_ ->
?WARN(overReceiveTcpData, "receive waiting_over TimerRef not match: ~p~n", [TimerRef]),
overReceiveTcpData(SrvState, CliState)
end;
{tcp_closed, Socket} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
{tcp_error, Socket, Reason} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}});
#miRequest{} = MiRequest ->
overReceiveTcpData(SrvState, CliState#cliState{requestsIns = [MiRequest | RequestsIns], backlogNum = BacklogNum + 1});
_Msg ->
?WARN(overReceiveTcpData, "receive unexpect msg: ~p~n", [_Msg]),
overReceiveTcpData(SrvState, CliState)
end.
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
Ip = agMiscUtils:randomElement(IPList),
case gen_tcp:connect(Ip, Port, SocketOptions, ?DEFAULT_CONNECT_TIMEOUT) of
{ok, Socket} ->
{ok, Socket};
{error, Reason} ->
?WARN(ServerName, "connect error: ~p~n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
?WARN(ServerName, "getaddrs error: ~p~n", [Reason]),
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsIns = RequestsIns, requestsOuts = RequestsOuts, backlogNum = BacklogNum} = CliState) ->
case erlang:monotonic_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case RequestsOuts of
[] ->
case RequestsIns of
[] ->
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], backlogNum = BacklogNum - 1});
MiRLists ->
[MiRequest | Outs] = lists:reverse(MiRLists),
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsIns = [], requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
[MiRequest] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = [], backlogNum = BacklogNum - 1});
[MiRequest | Outs] ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOuts = Outs, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?AgHead, status = waiting, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socketSendError}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socketSendError})
end
end.

+ 0
- 438
src/agHttpCli/vst.erl Zobrazit soubor

@ -1,438 +0,0 @@
-module(vst).
-export([
authorize/1,
connect/2,
request/2,
vst_maxsize/0
]).
assign_map(_acc@1, _key@1, _value@1) ->
case _acc@1 of
#{_key@1 := _} -> _acc@1;
#{} -> _acc@1#{_key@1 => _value@1};
_ -> #{_key@1 => _value@1}
end.
assign_split([<<>>, _rest@1], _value@1, _acc@1, _pattern@1) ->
_parts@1 = binary:split(_rest@1, _pattern@1),
case _acc@1 of
[_ | _] ->
[assign_split(_parts@1, _value@1, none, _pattern@1) | _acc@1];
none ->
[assign_split(_parts@1, _value@1, none, _pattern@1)];
_ -> _acc@1
end;
assign_split([_key@1, _rest@1], _value@1, _acc@1, _pattern@1) ->
_parts@1 = binary:split(_rest@1, _pattern@1),
case _acc@1 of
#{_key@1 := _current@1} ->
_acc@1#{_key@1 =>
assign_split(_parts@1, _value@1, _current@1, _pattern@1)};
#{} ->
_acc@1#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)};
_ ->
#{_key@1 => assign_split(_parts@1, _value@1, none, _pattern@1)}
end;
assign_split([<<>>], nil, _acc@1, __pattern@1) ->
case _acc@1 of
[_ | _] -> _acc@1;
_ -> []
end;
assign_split([<<>>], _value@1, _acc@1, __pattern@1) ->
case _acc@1 of
[_ | _] -> [_value@1 | _acc@1];
none -> [_value@1];
_ -> _acc@1
end;
assign_split([_key@1], _value@1, _acc@1, __pattern@1) ->
assign_map(_acc@1, _key@1, _value@1).
authorize(#{socket := _socket@1, username := _un@1, password := _pw@1} = _state@1) ->
case eVPack:encode([1, 1000, <<"plain">>, _un@1, _pw@1]) of
{ok, _auth@1} ->
case send_stream(_socket@1, build_stream(_auth@1)) of
ok ->
case recv_header(_socket@1) of
{ok, _header@1} ->
case recv_stream(_socket@1, _header@1) of
{ok, _stream@1} ->
case decode_stream(_stream@1) of
{ok, [[1, 2, 200, __headers@2] | __body@1]} ->
ok;
_@1 ->
case _@1 of
{ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} ->
{error, #{
'__exception__' => true,
error_num => nil,
status => _status@1,
message => proplists:get_value(<<"errorMessage">>, _body@1),
endpoint =>
case _state@1 of
#{endpoint := _@3} ->
_@3;
_@3 when erlang:is_map(_@3) ->
erlang:error({badkey, endpoint, _@3});
_@3 ->
_@3:endpoint()
end}};
{error, _reason@1} ->
{error, _reason@1};
_@2 ->
erlang:error({with_clause, _@2})
end
end;
_@1 ->
case _@1 of
{ok,
[[1, 2, _status@1, __headers@1], _body@1 | _]} ->
{error, #{
'__exception__' => true,
error_num => nil,
status => _status@1,
message => proplists:get_value(<<"errorMessage">>, _body@1),
endpoint =>
case _state@1 of
#{endpoint := _@3} ->
_@3;
_@3 when erlang:is_map(_@3) ->
erlang:error({badkey, endpoint, _@3});
_@3 -> _@3:endpoint()
end}};
{error, _reason@1} ->
{error, _reason@1};
_@2 -> erlang:error({with_clause, _@2})
end
end;
_@1 ->
case _@1 of
{ok,
[[1, 2, _status@1, __headers@1], _body@1
| _]} ->
{error, #{
'__exception__' => true,
error_num => nil, status => _status@1,
message => proplists:get_value(<<"errorMessage">>, _body@1),
endpoint =>
case _state@1 of
#{endpoint := _@3} -> _@3;
_@3 when erlang:is_map(_@3) ->
erlang:error({badkey,
endpoint,
_@3});
_@3 -> _@3:endpoint()
end}};
{error, _reason@1} -> {error, _reason@1};
_@2 -> erlang:error({with_clause, _@2})
end
end;
_@1 ->
case _@1 of
{ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} ->
{error,
#{
'__exception__' => true, error_num => nil,
status => _status@1,
message => proplists:get_value(<<"errorMessage">>, _body@1),
endpoint =>
case _state@1 of
#{endpoint := _@3} -> _@3;
_@3 when erlang:is_map(_@3) ->
erlang:error({badkey,
endpoint,
_@3});
_@3 -> _@3:endpoint()
end}};
{error, _reason@1} -> {error, _reason@1};
_@2 -> erlang:error({with_clause, _@2})
end
end;
_@1 ->
case _@1 of
{ok, [[1, 2, _status@1, __headers@1], _body@1 | _]} ->
{error, #{
'__exception__' => true,
error_num => nil,
status => _status@1,
message => proplists:get_value(<<"errorMessage">>, _body@1),
endpoint =>
case _state@1 of
#{endpoint := _@3} -> _@3;
_@3 when erlang:is_map(_@3) ->
erlang:error({badkey, endpoint, _@3});
_@3 -> _@3:endpoint()
end}};
{error, _reason@1} -> {error, _reason@1};
_@2 -> erlang:error({with_clause, _@2})
end
end.
body_for(<<>>) -> {ok, <<>>};
body_for(_body@1) ->
eVPack:encode(_body@1).
body_from([]) -> nil;
body_from([_body@1]) -> _body@1;
body_from(_body@1) -> _body@1.
build_stream(_message@1) ->
case chunk_every(_message@1, 30696) of
[_first_chunk@1 | _rest_chunks@1] ->
_n_chunks@1 = erlang:length([_first_chunk@1
| _rest_chunks@1]),
_msg_length@1 = erlang:byte_size(_message@1) +
_n_chunks@1 * 24,
_rest_chunks@2 =
lists:reverse(lists:flodl(
fun(_n@1, _@1) ->
case _rest_chunks@1 /= [] of
true ->
[prepend_chunk(lists:nth(_n@1, _rest_chunks@1), _n@1, 0, 0, _msg_length@1) | _@1];
false -> _@1
end
end, [], lists:seq(1, erlang:length(_rest_chunks@1)))),
[prepend_chunk(_first_chunk@1, _n_chunks@1, 1, 0, _msg_length@1) | _rest_chunks@2];
_only_chunk@1 ->
prepend_chunk(_only_chunk@1, 1, 1, 0, erlang:byte_size(_message@1) + 24)
end.
chunk_every(_bytes@1, _size@1) when erlang:byte_size(_bytes@1) =< _size@1 ->
_bytes@1;
chunk_every(_bytes@1, _size@1) ->
<<_chunk@1:_size@1/binary, _rest@1/binary>> = _bytes@1,
[_chunk@1 | (chunk_every(_rest@1, _size@1))].
connect(#{addr := _addr@1, 'ssl?' := _ssl}, _opts@1) ->
_mod@1 = case _ssl of
_@1 when _@1 =:= false orelse _@1 =:= nil -> gen_tcp;
_ -> ssl
end,
_transport_opts@1 = case _ssl of
_@2 when _@2 =:= false orelse _@2 =:= nil ->
tcp_opts;
_ -> ssl_opts
end,
_transport_opts@2 = proplists:get_value(_transport_opts@1, _opts@1, []),
_connect_timeout@1 = proplists:get(connect_timeout, _opts@1, 5000),
_options@1 = lists:merge(_transport_opts@2, [{packet, raw}, {mode, binary}, {active, false}]),
case _mod@1:connect(addr_for(_addr@1), port_for(_addr@1), _options@1, _connect_timeout@1) of
{ok, _port@1} ->
case _mod@1:send(_port@1, <<"VST/1.1\r\n\r\n">>) of
ok -> {ok, {_mod@1, _port@1}};
_@3 -> _@3
end;
_@3 -> _@3
end.
decode_pair({_key@1, _value@1}, _acc@1) ->
case case _key@1 /= <<>> of
false -> false;
true -> binary:last(_key@1) == 93
end
of
false -> assign_map(_acc@1, _key@1, _value@1);
true ->
_subkey@1 = binary:part(_key@1,
0,
erlang:byte_size(_key@1) - 1),
assign_split(binary:split(_subkey@1, <<"[">>),
_value@1,
_acc@1,
binary:compile_pattern(<<"][">>))
end.
decode_stream(_@1) -> decode_stream(_@1, []).
decode_stream(<<>>, _acc@1) -> {ok, _acc@1};
decode_stream(_stream@1, _acc@1) ->
case eVPack:decode(_stream@1) of
{ok, {_term@1, _rest@1}} ->
decode_stream(_rest@1, erlang:'++'(_acc@1, [_term@1]));
{ok, _term@2} -> {ok, erlang:'++'(_acc@1, [_term@2])};
{error, _reason@1} -> {error, _reason@1}
end.
headers_for(#{} = _headers@1) -> _headers@1;
headers_for(_headers@1)
when erlang:is_list(_headers@1) ->
maps:from_list(_headers@1).
method_for(delete) -> 0;
method_for(get) -> 1;
method_for(post) -> 2;
method_for(put) -> 3;
method_for(head) -> 4;
method_for(patch) -> 5;
method_for(options) -> 6;
method_for(_) -> -1.
port_for({unix, __path@1}) -> 0;
port_for({tcp, __host@1, _port@1}) -> _port@1.
prepend_chunk(_chunk@1, _chunk_n@1, _is_first@1,
_msg_id@1, _msg_length@1) ->
<<(24 + erlang:byte_size(_chunk@1)):32/integer-little,
(binary:decode_unsigned(<<_chunk_n@1:31/integer,
_is_first@1:1/integer>>,
little)):32/integer,
_msg_id@1:64/integer-little,
_msg_length@1:64/integer-little, _chunk@1/binary>>.
query_for(nil) -> #{}.
recv_chunk({_mod@1, _port@1}, _chunk_length@1) ->
_mod@1:recv(_port@1, _chunk_length@1 - 24).
recv_header({_mod@1, _port@1}) ->
case _mod@1:recv(_port@1, 24) of
{ok,
<<_chunk_length@1:32/integer-little,
_chunk_x@1:32/integer, _msg_id@1:64/integer-little,
_msg_length@1:64/integer-little>>} ->
<<_chunk_n@1:31/integer, _is_first@1:1/integer>> =
<<_chunk_x@1:32/integer-little>>,
{ok,
[_chunk_length@1,
_chunk_n@1,
_is_first@1,
_msg_id@1,
_msg_length@1]};
{error, _reason@1} -> {error, _reason@1}
end.
recv_stream(_socket@1,
[_chunk_length@1, 1, 1, __msg_id@1, __msg_length@1]) ->
recv_chunk(_socket@1, _chunk_length@1);
recv_stream(_socket@1,
[_chunk_length@1,
_n_chunks@1,
1,
__msg_id@1,
__msg_length@1]) ->
case recv_chunk(_socket@1, _chunk_length@1) of
{ok, _buffer@1} ->
case recv_stream(_socket@1, _n_chunks@1, _buffer@1) of
{ok, _stream@1} -> {ok, _stream@1};
_@1 -> _@1
end;
_@1 -> _@1
end.
recv_stream(_socket@1, _n_chunks@1, _buffer@1) ->
lists:reduce_while(lists:seq(1, _n_chunks@1 - 1),
_buffer@1,
fun(_n@1, _buffer@2) ->
case recv_header(_socket@1) of
{ok, [_chunk_length@1, _, _, _, _]} ->
case recv_chunk(_socket@1, _chunk_length@1) of
{ok, _chunk@1} ->
case _n@1 == _n_chunks@1 - 1 of
false ->
{cont, <<_buffer@2/binary, _chunk@1/binary>>};
true ->
{halt, {ok, <<_buffer@2/binary, _chunk@1/binary>>}}
end;
_@1 ->
case _@1 of
{error, _reason@1} ->
{halt, {error, _reason@1}};
_@2 ->
erlang:error({with_clause, _@2})
end
end;
_@1 ->
case _@1 of
{error, _reason@1} ->
{halt, {error, _reason@1}};
_@2 ->
erlang:error({with_clause, _@2})
end
end
end).
request(#{method := _method@1, path := _path@1, headers := _headers@1, body := _body@1}, #{socket := _socket@1, database := _database@1} = _state@1) ->
#{path := _path@2, query := _query@1} = http_uri:parse(_path@1),
{_database@3, _path@4} =
case _path@2 of
<<"/_db/", _rest@1/binary>> ->
[_database@2, _path@3] = binary:split(_rest@1, <<"/">>),
{_database@2, <<"/", _path@3/binary>>};
_ ->
{case _database@1 of
_@1 when _@1 =:= false orelse _@1 =:= nil ->
<<>>;
_@2 -> _@2
end, _path@2}
end,
_request@1 = [1, 1, _database@3, method_for(_method@1), _path@4, query_for(_query@1), headers_for(_headers@1)],
case eVPack:encode(_request@1) of
{ok, _request@2} ->
case body_for(_body@1) of
{ok, _body@2} ->
case send_stream(_socket@1, build_stream(<<_request@2/binary, _body@2/binary>>))
of
ok ->
case recv_header(_socket@1) of
{ok, _header@1} ->
case recv_stream(_socket@1, _header@1) of
{ok, _stream@1} ->
case decode_stream(_stream@1) of
{ok, [[1, 2, _status@1, _headers@2] | _body@3]} ->
{ok, #{status => _status@1, headers => _headers@2, body => body_from(_body@3)}, _state@1};
_@3 ->
case _@3 of
{error, closed} ->
{error, noproc, _state@1};
{error, _reason@1} ->
{error, _reason@1, _state@1};
_@4 ->
erlang:error({with_clause, _@4})
end
end;
_@3 ->
case _@3 of
{error, closed} ->
{error, noproc, _state@1};
{error, _reason@1} ->
{error, _reason@1, _state@1};
_@4 ->
erlang:error({with_clause, _@4})
end
end;
_@3 ->
case _@3 of
{error, closed} ->
{error, noproc, _state@1};
{error, _reason@1} ->
{error, _reason@1, _state@1};
_@4 -> erlang:error({with_clause, _@4})
end
end;
_@3 ->
case _@3 of
{error, closed} -> {error, noproc, _state@1};
{error, _reason@1} ->
{error, _reason@1, _state@1};
_@4 -> erlang:error({with_clause, _@4})
end
end;
_@3 ->
case _@3 of
{error, closed} -> {error, noproc, _state@1};
{error, _reason@1} -> {error, _reason@1, _state@1};
_@4 -> erlang:error({with_clause, _@4})
end
end;
_@3 ->
case _@3 of
{error, closed} -> {error, noproc, _state@1};
{error, _reason@1} -> {error, _reason@1, _state@1};
_@4 -> erlang:error({with_clause, _@4})
end
end.
send_stream(Socket, IoData) ->
tcp:send(Socket, IoData).
vst_maxsize() -> 30720.

Načítá se…
Zrušit
Uložit