浏览代码

agHttpCli 优化调整

erlArango_v1
SisMaker 5 年前
父节点
当前提交
1843c073d9
共有 11 个文件被更改,包括 597 次插入269 次删除
  1. +33
    -24
      include/agHttpCli.hrl
  2. +1
    -0
      include/erlArango.hrl
  3. +344
    -186
      src/arangoApi/agCollections.erl
  4. +6
    -0
      src/httpCli/agAgencyPoolMgrIns.erl
  5. +10
    -5
      src/httpCli/agAgencyUtils.erl
  6. +20
    -15
      src/httpCli/agHttpCli.erl
  7. +36
    -16
      src/httpCli/agHttpProtocol.erl
  8. +6
    -2
      src/httpCli/agMiscUtils.erl
  9. +7
    -6
      src/httpCli/agSslAgencyIns.erl
  10. +133
    -14
      src/httpCli/agTcpAgencyIns.erl
  11. +1
    -1
      src/httpCli/test.erl

+ 33
- 24
include/agHttpCli.hrl 查看文件

@ -7,21 +7,13 @@
-define(DEFAULT_DBNAME, <<"_db/_system">>).
-define(USER_PASSWORD, <<"root:156736">>).
-define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_INIT_OPTS, undefined).
-define(DEFAULT_CONNECT_TIMEOUT, 5000).
-define(DEFAULT_POOL_SIZE, 16).
-define(DEFAULT_POOL_STRATEGY, random).
-define(DEFAULT_POOL_OPTIONS, []).
-define(DEFAULT_IS_RECONNECT, true).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_RECONNECT_MIN, 500).
-define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_TIMEOUT, infinity).
-define(DEFAULT_BODY, undefined).
-define(DEFAULT_HEADERS, []).
-define(DEFAULT_PID, self()).
-define(DEFAULT_PROTOCOL, tcp).
-define(DEFAULT_PORTO(Protocol), 8529).
%%-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end).
-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {delay_send, true}, {nodelay, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
-define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)).
@ -49,6 +41,7 @@
-record(requestRet, {
statusCode :: undefined | 100..505,
contentLength :: undefined | non_neg_integer() | chunked,
headers :: undefined | [binary()],
body :: undefined | binary()
}).
@ -67,25 +60,40 @@
current :: non_neg_integer() | undefined
}).
-record(srvState, {
poolName :: poolName(),
serverName :: serverName(),
userPassWord :: binary(),
host :: binary(),
dbName :: binary(),
rn :: binary:cp(),
rnrn :: binary:cp(),
reconnectState :: undefined | reconnectState(),
socket :: undefined | ssl:sslsocket(),
timerRef :: undefined | reference()
}).
-record(cliState, {
isHeadMethod = false :: boolean(), %% <<"HEAD">>
%method = undefined :: undefined | method(),
requestsIn = 1 :: non_neg_integer(),
requestsOut = 0 :: non_neg_integer(),
status = leisure :: waiting | leisure,
backlogNum = 0 :: integer(),
backlogSize = 0 :: integer(),
status = leisure :: waiting | leisure,
curInfo = undefined :: tuple(),
recvState :: recvState() | undefined
recvState = undefined :: recvState() | undefined
}).
-record(dbOpts, {
host :: host(),
port :: 0..65535,
hostname :: string(),
hostname :: hostName(),
dbName :: binary(),
protocol :: protocol(),
poolSize :: binary(),
userPassword :: binary(),
socketOpts :: [gen_tcp:connect_option(), ...]
socketOpts :: socketOpts()
}).
-record(agencyOpts, {
@ -99,6 +107,7 @@
-type miAgHttpCliRet() :: #miAgHttpCliRet{}.
-type requestRet() :: #requestRet{}.
-type recvState() :: #recvState{}.
-type srvState() :: #srvState{}.
-type cliState() :: #cliState{}.
-type reconnectState() :: #reconnectState{}.
@ -111,26 +120,26 @@
-type body() :: iodata() | undefined.
-type path() :: binary().
-type host() :: binary().
-type hostName() :: string().
-type poolSize() :: pos_integer().
-type backlogSize() :: pos_integer() | infinity.
-type requestId() :: {serverName(), reference()}.
-type externalRequestId() :: term().
-type response() :: {externalRequestId(), term()}.
-type socket() :: inet:socket() | ssl:sslsocket().
-type socketOpts() :: [gen_tcp:connect_option(), ...].
-type error() :: {error, term()}.
-type dbCfg() ::
{baseUrl, binary()} |
{dbName, binary()} |
{userPassword, binary()} |
{poolSize, poolSize()} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
{baseUrl, binary()} |
{dbName, binary()} |
{userPassword, binary()} |
{poolSize, poolSize()} |
{socketOpts, [gen_tcp:connect_option(), ...]}.
-type agencyCfg() ::
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()}.
{reconnect, boolean()} |
{backlogSize, backlogSize()} |
{reconnectTimeMin, pos_integer()} |
{reconnectTimeMax, pos_integer()}.
-type dbCfgs() :: [dbCfg()].
-type dbOpts() :: #dbOpts{}.

+ 1
- 0
include/erlArango.hrl 查看文件

@ -4,4 +4,5 @@
-define(Get, <<"GET">>).
-define(Put, <<"PUT">>).
-define(Post, <<"POST">>).
-define(Head, <<"HEAD">>).
-define(Delete, <<"DELETE">>).

+ 344
- 186
src/arangoApi/agCollections.erl 查看文件

@ -3,220 +3,378 @@
-compile([export_all, nowarn_export_all]).
% doc_address:https://www.arangodb.com/docs/stable/http/collection.html
%
% POST /_api/collection
%
% waitForSyncReplication10
% forceReplicationFactor10
%
% JSON对象是必需的
% name
% waitForSynctruefalse
% doCompacttrueMMFiles存储引擎有意义
% journalSize10485761 MiBMMFiles存储引擎有意义
% isSystemtruecollection-name 线API实现者来创建系统集合使false
% isVolatiletrue
% 使ArangoDB不会对磁盘CRC校验和
% falseMMFiles存储引擎有意义
% keyOptionskeyOptions JSON数组
% typeuuid和填充生成器
% inital偏移和间隔可以被配置成在填充密钥发生器以上升辞书排序顺序生成的固定长度16
% RocksDB 使使UUID密钥生成器生成通用的唯一128位密钥
%
% allowUserKeystrue_key属性中提供自己的键值 false_key属性中提供自己的密钥值被视为错误
%
% offset
% type2
% 2
% 3
% indexBuckets使1621024
%
% 1亿64ArangoDB版本中使用MMFiles存储引擎有意义
% numberOfShards1
% shardKeys[ _key]使
%
% plicationFactor1DBServer上保留多少个副本1k的值表示保留k-1DBServer上
% leader follower
%
% DistributionShardsLike 使
% 仿
% shardingStrategyArangoDB 3.4shardingStrategy
% 使使
%
% community-compat3.4ArangoDB社区版使用的默认分片
% enterprise-compat3.4ArangoDB企业版使用的默认分片
% enterprise-smart-edge-compat3.4ArangoDB Enterprise Edition中的智能边缘集合使用的默认分片
% hash3.4
% enterprise-hash-smart-edge3.4
% enterprise-hash-smart-edgeArangoDB
% waitForSyncReplication1
% 0
% forceReplicationFactor1
% 0
%
% smartJoinAttributeEnterprise Edition集群中
% 使 distributedShardsLike属性设置为另一个集合的名称shardKeys属性设置为单个shard key属性smartJoinAttribute中存储的值都必须是字符串
%
%
% 400HTTP 400
% 404HTTP 404
newColl(PoolName, Param) ->
BodyStr = jiffy:encode(Param),
agHttpCli:callAgency(PoolName, ?Post, <<"/_api/collection">>, [], BodyStr, infinity).
%
% JSON对象是必需的
% name
% waitForSynctruefalse
% doCompacttrueMMFiles存储引擎有意义
% journalSize10485761 MiB
% MMFiles存储引擎有意义
% isSystemtruecollection-name 线
% API实现者来创建系统集合使false
% isVolatiletrue
% 使
% ArangoDB不会对磁盘CRC校验和
%
% falseMMFiles存储引擎有意义
% keyOptionskeyOptions JSON数组
% type uuid的和填充的
%
% inital偏移和间隔可以被配置成在填充密钥发生器以上升辞书排序顺序生成的固定长度16
% RocksDB配合使用的理想选择
% 使UUID密钥生成器生成通用唯一的128位密钥
%
%
% allowUserKeystrue_key属性中提供自己的键值 false
% _key属性中提供自己的密钥值被视为错误
% incremen
% offset
% type2type值有效
% 2
% 3
% indexBuckets使1621024
%
% 1亿64
% ArangoDB版本中使用
% MMFiles存储引擎有意
% shardKeys[ _key]
%
%
% plicationFactor1DBServer上保留多少个副本
% 1k的值表示保留k-1DBServer上
% leader follower
%
% writeConcern1DBServer上同步每个分片需要多少个副本
% writeConcern的值 ReplicationFactor
% DistributionShardsLike
% 使仿
% shardingStrategyArangoDB 3.4
% shardingStrategy 使使
%
%
% community-compat3.4ArangoDB社区版使用的默认分片
% enterprise-compat3.4ArangoDB企业版使用的默认分片
% enterprise-smart-edge-compat3.4ArangoDB Enterprise Edition中的智能边缘集合使用的默认分片
% hash3.4
% enterprise-hash-smart-edge3.4
% enterprise-hash-smart-edgeArangoDB
% smartJoinAttribute
% 使 distributedShardsLike属性设置为另一个集合的名称shardKeys属性设置为单个shard key属性
% smartJoinAttribute中存储的值都必须是字符串
%
% 400HTTP 400
% 404HTTP 404
newColl(PoolNameOrSocket, Args) ->
BodyStr = jiffy:encode(Args),
agHttpCli:callAgency(PoolNameOrSocket, ?Post, <<"/_api/collection">>, [], BodyStr).
newColl(PoolNameOrSocket, Args, WaitForSyncReplication, ForceReplicationFactor) ->
BodyStr = jiffy:encode(Args),
Path = <<"/_api/collection?waitForSyncReplication=", (erlang:integer_to_binary(WaitForSyncReplication))/binary, "&forceReplicationFactor=", (erlang:integer_to_binary(ForceReplicationFactor))/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Post, Path, [], BodyStr).
%
% DELETE /_api/collection/{collection-name}
delColl(PoolName, CoolName, IsSystem) ->
delColl(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Delete, Path, [], undefined).
delColl(PoolNameOrSocket, CollName, IsSystem) ->
case IsSystem of
true ->
Path = <<"/_api/collection/", CoolName/binary, "?isSystem=true">>,
agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity);
Path = <<"/_api/collection/", CollName/binary, "?isSystem=true">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Delete, Path, [], undefined);
_ ->
Path = <<"/_api/collection/", CoolName/binary>>,
agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity)
Path = <<"/_api/collection/", CollName/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Delete, Path, [], undefined)
end.
%%
%% PUT /_api/collection/{collection-name}/truncate
clearColl(PoolName, CoolName) ->
Path = <<"/_api/collection/", CoolName/binary, "/truncate">>,
agHttpCli:callAgency(PoolName, ?Put, Path, [], undefined, infinity).
%%
%% GET /_api/collection/{collection-name}
%%
%% id
%% name
%% status
%% 1
%% 2
%% 3
%% 4
%% 5
%% 6
%%
%% type
%% 2
%% 3
%% isSystemtrue
collInfo(PoolName, CoolName) ->
Path = <<"/_api/collection/", CoolName/binary>>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity).
%%
%%
%% GET /_api/collection/{collection-name}/properties
collProperties(PoolName, CoolName) ->
Path = <<"/_api/collection/", (CoolName)/binary, "/properties">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity).
%%
%%
%%
%% GET /_api/collection/{collection-name}/count
collCount(PoolName, CoolName) ->
Path = <<"/_api/collection/", CoolName/binary, "/count">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity).
%%
%%
%% GET /_api/collection/{collection-name}/figures
%%
collFigures(PoolName, CoolName) ->
Path = <<"/_api/collection/", CoolName/binary, "/figures">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity).
%%
%% 退
%% PUT /_api/collection/{collection-name}/responsibleShard
%%
%% collection-name
%% json
%% JSON对象
%% ID
%% JSON文档
%% shardId属性的JSON对象ID
%%
collResponsibleShard(PoolName, CoolName, Param) ->
Path = <<"/_api/collection/", CoolName/binary, "/responsibleShard">>,
BodyStr = jiffy:encode(Param),
agHttpCli:callAgency(PoolName, ?Get, Path, [], BodyStr, infinity).
%% ID
%% ID
%% ID的JSON数组
%% details参数设置为trueID作为对象属性键的JSON对象
%%
%% GET /_api/collection/{collection-name}/shards
%% Query Parameters
%% details (optional)true
collShards(PoolName, CoolName, IsDetails) ->
%
% PUT /_api/collection/{collection-name}/truncate
clearColl(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/truncate">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined).
%
% GET /_api/collection/{collection-name}
%
% id
% name
% status
% 1
% 2
% 3
% 4
% 5
% 6
%
% type
% 2
% 3
% isSystemtrue
%
%
% 404HTTP 404
collInfo(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
%
% GET /_api/collection/{collection-name}/properties
collProperties(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", (CollName)/binary, "/properties">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
%
%
%
% GET /_api/collection/{collection-name}/count
collCount(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/count">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
%
% GET /_api/collection/{collection-name}/figures
%
%
%
% JSON文件的文件大小fileSize值以字节为单位报告
% 使
% fileSize结果中报告
% 100使使fileSize值的总和
% fileSize值的总和用作磁盘使用率的下限近似值
collFigures(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/figures">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
%
% PUT /_api/collection/{collection-name}/responsibleShard
%
% collection-name
% json
% JSON对象
% ID
% JSON文档
% shardId属性的JSON对象ID
%
% eg: args = #{'_key' => testkey, value => 23}
collResponsibleShard(PoolNameOrSocket, CollName, Args) ->
Path = <<"/_api/collection/", CollName/binary, "/responsibleShard">>,
BodyStr = jiffy:encode(Args),
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], BodyStr).
% ID
% ID
% ID的JSON数组
% details参数设置为trueID作为对象属性键的JSON对象
%
% GET /_api/collection/{collection-name}/shards
% Query Parameters
% details (optional)true
collShards(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/shards">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
collShards(PoolNameOrSocket, CollName, IsDetails) ->
case IsDetails of
true ->
Path = <<"/_api/collection/", CoolName/binary, "/shards?details=true">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity);
Path = <<"/_api/collection/", CollName/binary, "/shards?details=true">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined);
_ ->
Path = <<"/_api/collection/", CoolName/binary, "/shards">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity)
Path = <<"/_api/collection/", CollName/binary, "/shards">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined)
end.
%% ID
%% ID
%% IDID是服务器生成的字符串使
%% ID作为字符串
%% GET /_api/collection/{collection-name}/revision
collRevision(PoolName, CoolName) ->
Path = <<"/_api/collection/", CoolName/binary, "/revision">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity).
%%
%%
%%
%% withRevisionsID
%% withData
%% ID
%% ArangoDB实例上的两个集合是否包含相同的内容
%% _key系统属性来计算_from和_to也将包含在计算中
%% withRevisions设置为trueID_rev系统属性
%% withData提供值为true的值 使
%% JSON对象
%% checksum
%% ID作为字符串
%%
%% GET /_api/collection/{collection-name}/checksum
collChecksum(PoolName, CoolName, IsWithRevisions, IsWithData) ->
% ID
% IDID是服务器生成的字符串使
% ID作为字符串
% GET /_api/collection/{collection-name}/revision
collRevision(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/revision">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
%
%
%
% withRevisionsID
% withData
% ID
% ArangoDB实例上的两个集合是否包含相同的内容
% _key系统属性来计算_from和_to也将包含在计算中
% withRevisions设置为trueID_rev系统属性
% withData提供值为true的值 使
% JSON对象
% checksum
% ID作为字符串
%
% GET /_api/collection/{collection-name}/checksum
collChecksum(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/checksum">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined).
collChecksum(PoolNameOrSocket, CollName, IsWithRevisions, IsWithData) ->
case IsWithRevisions orelse IsWithData of
false ->
Path = <<"/_api/collection/", CoolName/binary, "/checksum">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity);
Path = <<"/_api/collection/", CollName/binary, "/checksum">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined);
_ ->
Path = <<"/_api/collection/", CoolName/binary, "/checksum?withRevisions=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary, "&withData=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary>>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity)
Path = <<"/_api/collection/", CollName/binary, "/checksum?withRevisions=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary, "&withData=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined)
end.
%%
%%
%%
%%
%%excludeSystem
%% GET /_api/collection
collList(PoolName, CoolName, IsExcludeSystem) ->
%
%
% excludeSystem
% GET /_api/collection
collList(PoolNameOrSocket) ->
agHttpCli:callAgency(PoolNameOrSocket, ?Get, <<"/_api/collection">>, [], undefined).
collList(PoolNameOrSocket, IsExcludeSystem) ->
case IsExcludeSystem of
false ->
Path = <<"/_api/collection/", CoolName/binary, "/checksum">>,
agHttpCli:callAgency(PoolName, ?Get, <<"/_api/collection">>, [], undefined, infinity);
agHttpCli:callAgency(PoolNameOrSocket, ?Get, <<"/_api/collection">>, [], undefined);
_ ->
Path = <<"/_api/collection/?excludeSystem=true">>,
agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity)
agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined)
end.
%%
%%
%% countcount设置为 false可以加快加载集合的速度
%% PUT /_api/collection/{collection-name}/load
collLoad(PoolName, CoolName, IsCount) ->
%
%
% countcount设置为 false可以加快加载集合的速度 true
% PUT /_api/collection/{collection-name}/load
collLoad(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/load">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined).
collLoad(PoolNameOrSocket, CollName, IsCount) ->
case IsCount of
false ->
Path = <<"/_api/collection/", CoolName/binary, "/load">>,
agHttpCli:callAgency(PoolName, ?Put, Path, [], <<"{\"count\":false}">>, infinity);
Path = <<"/_api/collection/", CollName/binary, "/load">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], <<"{\"count\":false}">>);
_ ->
Path = <<"/_api/collection/", CoolName/binary, "/load">>,
agHttpCli:callAgency(PoolName, ?Put, Path, [], undefined, infinity)
Path = <<"/_api/collection/", CollName/binary, "/load">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined)
end.
%
% PUT /_api/collection/{collection-name}/unload
%
% collection-name使
% id
% name
% status
% type
% 2
% 3
% isSystemtrue
collUnload(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/unload">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined).
%
% PUT /_api/collection/{collection-name}/loadIndexesIntoMemory
%
% collection-namecollection的所有索引条目缓存到主内存中
%
%
%
% RocksDB存储引擎上有用MMFiles引擎中
% RocksDB上
%
%
collLoadIndexesIntoMemory(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/loadIndexesIntoMemory">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined).
%
% PUT /_api/collection/{collection-name}/properties
%
% collection-name
%
% waitForSynctrue
% journalSize10485761 MBjournalSize值时
%
% id
% name
% waitForSync
% journalSize
% status
% type
% 2
% 3
% isSystemtrue
% isVolatiletrueArangoDB不会将数据写入或同步到磁盘
% doCompact
% keyOptionsJSON对象
% typeuuid的
% allowUserKeystrue_key属性中提供自己的键值 false_key属性中提供自己的密钥值被视为错误
% waitForSyncjournalSize和name之外使
collChangeProperties(PoolNameOrSocket, CollName, Args) ->
Path = <<"/_api/collection/", CollName/binary, "/properties">>,
BodyStr = jiffy:encode(Args),
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], BodyStr).
%
% PUT /_api/collection/{collection-name}/rename
%
% collection-name
%
% name
%
% id
% name
% status
% type
% 2
% 3
% isSystemtrue
% _graphs在当前数据库中该集合内的所有图形定义中重命名
%
collRename(PoolNameOrSocket, OldName, NewName) ->
Path = <<"/_api/collection/", OldName/binary, "/rename">>,
NameStr = jiffy:encode(NewName),
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], <<"{\"name\":", NameStr/binary, "}">>).
%
% PUT /_api/collection/{collection-name}/rotate
%
% collection-name
% Rotate方法的目的是使文件中的数据可用于压缩
%
%
% true
% MMFiles存储引擎
%
% 400HTTP 400
% 404HTTP 404
collRotate(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/rotate">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined).
%
% PUT /_api/collection/{collection-name}/recalculateCount
%
% collection-name
%
%
% true
% RocksDB存储引擎
collRecalculateCount(PoolNameOrSocket, CollName) ->
Path = <<"/_api/collection/", CollName/binary, "/recalculateCount">>,
agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined).

+ 6
- 0
src/httpCli/agAgencyPoolMgrIns.erl 查看文件

@ -43,6 +43,11 @@ handleMsg(_Msg, State) ->
{ok, State}.
terminate(_Reason, _State) ->
io:format("IMY******************* agAgencyPoolMgrIns terminate ~p~n ", [_Reason]),
ets:delete_all_objects(?ETS_AG_Pool),
ets:delete_all_objects(?ETS_AG_Agency),
agKvsToBeam:load(?agBeamPool, []),
agKvsToBeam:load(?agBeamAgency, []),
ok.
-spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}.
@ -152,6 +157,7 @@ cacheDelAgency(PoolName) ->
agKvsToBeam:load(?agBeamAgency, KVS),
ok.
-spec getOneAgency(atom()) -> atom() | {error, term()}.
getOneAgency(PoolName) ->
case ?agBeamPool:get(PoolName) of
undefined ->

+ 10
- 5
src/httpCli/agAgencyUtils.erl 查看文件

@ -14,29 +14,34 @@
, reconnectTimer/2
, agencyReply/2
, agencyReply/4
, agencyReplyAll/1
, initReconnectState/3
, resetReconnectState/1
, updateReconnectState/1
]).
-spec getQueue(pos_integer()) -> undefined | miRequest().
getQueue(RequestsIn) ->
erlang:get(RequestsIn).
-spec addQueue(pos_integer(), miRequest()) -> undefined.
addQueue(RequestsIn, MiRequest) ->
erlang:put(RequestsIn, MiRequest).
-spec delQueue(pos_integer()) -> miRequest().
delQueue(RequestsIn) ->
erlang:erase(RequestsIn).
-spec clearQueue() -> term().
clearQueue() ->
erlang:erase().
dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) ->
agAgencyUtils:agencyReply(CurInfo, Reply),
agAgencyUtils:agencyReplyAll(Reply),
reconnectTimer(SrvState, ClientState).
-spec dealClose(srvState(), cliState(), term()) -> {ok, srvState(), cliState()}.
dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) ->
agencyReply(CurInfo, Reply),
agencyReplyAll(Reply),
reconnectTimer(SrvState, ClientState#cliState{requestsIn = 1, requestsOut = 0, backlogNum = 0}).
-spec reconnectTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}.
reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) ->
{ok, {SrvState#srvState{socket = undefined}, CliState}};
reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) ->

+ 20
- 15
src/httpCli/agHttpCli.erl 查看文件

@ -1,5 +1,6 @@
-module(agHttpCli).
-include("agHttpCli.hrl").
-include("erlArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
@ -82,8 +83,8 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
_ ->
case getCurDbInfo(PoolNameOrSocket) of
{DbName, UserPassWord, Host, Protocol} ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
io:format("IMY*******************************~n~p ~n",[Request]),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
io:format("IMY*******************************~n~p ~n", [Request]),
case Protocol of
tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of
@ -95,7 +96,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>));
receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?Head);
{error, Reason} = Err ->
?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket),
@ -111,7 +112,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>));
receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?Head);
{error, Reason} = Err ->
?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket),
@ -130,16 +131,16 @@ receiveResponse(RequestId) ->
Reply
end.
-spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}.
receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) ->
-spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> requestRet() | {error, term()}.
receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
receive
{tcp, Socket, Data} ->
io:format("IMY******************************* ~p ~n ",[Data]),
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
io:format("IMY******************************* ~p ~n ", [Data]),
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} ->
#requestRet{statusCode = StatusCode, contentLength = ContentLength, headers= Headers, body = Body};
#requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body};
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn);
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod);
{error, Reason} ->
?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
@ -160,15 +161,15 @@ receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) ->
{error, {tcp_error, Reason}}
end.
-spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}.
receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn) ->
-spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> requestRet() | {error, term()}.
receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
receive
{ssl, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
#requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body};
{ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn);
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod);
{error, Reason} ->
?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket),
@ -201,7 +202,7 @@ startPool(PoolName, DbCfgs, AgencyCfgs) ->
stopPool(PoolName) ->
agAgencyPoolMgrIns:stopPool(PoolName).
-spec connectDb(dbCfgs()) -> {ok, socket()} | error.
-spec connectDb(dbCfgs()) -> {ok, socket()} | {error, term()}.
connectDb(DbCfgs) ->
#dbOpts{
host = Host,
@ -240,6 +241,7 @@ connectDb(DbCfgs) ->
Err
end.
-spec disConnectDb(socket()) -> ok | {error, term()}.
disConnectDb(Socket) ->
case erlang:erase({'$agDbInfo', Socket}) of
undefined ->
@ -253,12 +255,15 @@ disConnectDb(Socket) ->
end
end.
-spec setCurDbInfo(socket(), binary(), tuple(), host(), protocol()) -> term().
setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol) ->
erlang:put({'$agDbInfo', Socket}, {DbName, UserPassword, Host, Protocol}).
-spec getCurDbInfo(socket()) -> term().
getCurDbInfo(Socket) ->
erlang:get({'$agDbInfo', Socket}).
-spec setCurDbName(socket(), binary()) -> ok.
setCurDbName(Socket, NewDbName) ->
case erlang:get({'$agDbInfo', Socket}) of
undefined ->

+ 36
- 16
src/httpCli/agHttpProtocol.erl 查看文件

@ -7,8 +7,8 @@
-export([
headers/1
, request/7
, response/1
, response/4
, response/2
, response/5
]).
%% <<"Content-Type: application/json; charset=utf-8">>,
@ -45,12 +45,12 @@ request(true, Body, Method, Host, _DbName, Path, Headers) ->
].
-spec response(binary()) -> {ok, recvState(), binary()} | error().
response(Data) ->
response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data).
-spec response(binary(), boolean()) -> {ok, recvState(), binary()} | error().
response(Data, IsHeadMethod) ->
response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data, IsHeadMethod).
-spec response(undefined | recvState(), binary:cp(), binary:cp(), binary()) -> {ok, recvState()} | error().
response(undefined, Rn, RnRn, Data) ->
-spec response(undefined | recvState(), binary:cp(), binary:cp(), binary(), boolean()) -> {ok, recvState()} | error().
response(undefined, Rn, RnRn, Data, IsHeadMethod) ->
case parseStatusLine(Data, Rn) of
{StatusCode, Rest} ->
case splitHeaders(Rest, Rn, RnRn) of
@ -59,8 +59,13 @@ response(undefined, Rn, RnRn, Data) ->
{0, Headers, Rest} ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}};
{chunked, Headers, Body} ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Body);
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}};
_ ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Body, IsHeadMethod)
end;
{ContentLength, Headers, Body} ->
BodySize = erlang:size(Body),
if
@ -70,7 +75,12 @@ response(undefined, Rn, RnRn, Data) ->
?WARN(agTcpAgencyIns, "11 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]),
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
true ->
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
_ ->
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
end
end;
not_enough_data ->
%% headers都不足
@ -82,7 +92,7 @@ response(undefined, Rn, RnRn, Data) ->
{error, Reason} ->
{error, Reason}
end;
response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = Buffer} = RecvState, Rn, _RnRn, Data) ->
response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = Buffer} = RecvState, Rn, _RnRn, Data, _IsHeadMethod) ->
NewBuffer = <<Buffer/binary, Data/binary>>,
case parseChunks(NewBuffer, Rn, []) of
{ok, AddBody, _Rest} ->
@ -94,7 +104,7 @@ response(#recvState{stage = body, contentLength = chunked, body = Body, buffer =
{error, Reason} ->
{error, Reason}
end;
response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data) ->
response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data, _IsHeadMethod) ->
CurData = <<Body/binary, Data/binary>>,
BodySize = erlang:size(CurData),
if
@ -106,7 +116,7 @@ response(#recvState{stage = body, contentLength = ContentLength, body = Body} =
true ->
{ok, RecvState#recvState{body = CurData}}
end;
response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) ->
response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data, IsHeadMethod) ->
CurBody = <<OldBody/binary, Data/binary>>,
case parseStatusLine(CurBody, Rn) of
{StatusCode, Rest} ->
@ -116,8 +126,13 @@ response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) ->
{0, Headers, Body} ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Body}};
{chunked, Headers, Rest} ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Rest);
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}};
_ ->
RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers},
response(RecvState, Rn, RnRn, Rest, IsHeadMethod)
end;
{ContentLength, Headers, Body} ->
BodySize = erlang:size(Body),
if
@ -127,7 +142,12 @@ response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) ->
?WARN(agTcpAgencyIns, "33 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]),
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
true ->
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of
true ->
{done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}};
_ ->
{ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}
end
end;
not_enough_data ->
%% headers都不足

+ 6
- 2
src/httpCli/agMiscUtils.erl 查看文件

@ -22,6 +22,7 @@ parseUrl(<<"https://", Rest/binary>>) ->
parseUrl(_) ->
{error, invalid_url}.
-spec parseUrl(protocol(), binary()) -> dbOpts().
parseUrl(Protocol, Rest) ->
{Host, _Path} =
case binary:split(Rest, <<"/">>, [trim]) of
@ -45,6 +46,7 @@ parseUrl(Protocol, Rest) ->
end,
#dbOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}.
-spec dbOpts(list()) -> dbOpts().
dbOpts(DbCfgs) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL),
DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME),
@ -52,9 +54,10 @@ dbOpts(DbCfgs) ->
PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE),
SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS),
DbOpts = agMiscUtils:parseUrl(BaseUrl),
UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>,
UserPasswordBase64 = {<<"Authorization">>, <<"Basic ", (base64:encode(UserPassword))/binary>>},
DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
-spec agencyOpts(list()) -> agencyOpts().
agencyOpts(AgencyCfgs) ->
IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_IS_RECONNECT),
BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?DEFAULT_BACKLOG_SIZE),
@ -62,7 +65,7 @@ agencyOpts(AgencyCfgs) ->
Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_RECONNECT_MAX),
#agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}.
-spec getListValue(term(), list(), term()) -> term().
getListValue(Key, List, Default) ->
case lists:keyfind(Key, 1, List) of
false ->
@ -82,6 +85,7 @@ randomElement([_ | _] = List) ->
T = list_to_tuple(List),
element(rand:uniform(tuple_size(T)), T).
-spec toBinary(term()) -> binary().
toBinary(Value) when is_integer(Value) -> integer_to_binary(Value);
toBinary(Value) when is_list(Value) -> list_to_binary(Value);
toBinary(Value) when is_float(Value) -> float_to_binary(Value, [{decimals, 6}, compact]);

+ 7
- 6
src/httpCli/agSslAgencyIns.erl 查看文件

@ -1,5 +1,6 @@
-module(agSslAgencyIns).
-include("agHttpCli.hrl").
-include("erlArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
@ -34,7 +35,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -44,7 +45,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
ssl:close(Socket),
@ -59,8 +60,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
end;
handleMsg({ssl, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}),
case agAgencyUtils:getQueue(RequestsOut + 1) of
@ -166,7 +167,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case ssl:send(Socket, Request) of
ok ->
TimerRef =
@ -176,7 +177,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
ssl:close(Socket),

+ 133
- 14
src/httpCli/agTcpAgencyIns.erl 查看文件

@ -1,5 +1,6 @@
-module(agTcpAgencyIns).
-include("agHttpCli.hrl").
-include("erlArango.hrl").
-compile(inline).
-compile({inline_size, 128}).
@ -34,8 +35,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
_ ->
case Status of
leisure -> %%
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
io:format("IMY*******************************~n~p ~n",[Request]),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
io:format("IMY*******************************~n~p ~n", [Request]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
@ -45,7 +46,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]),
gen_tcp:close(Socket),
@ -60,8 +61,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod
end;
handleMsg({tcp, Socket, Data},
#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}),
case agAgencyUtils:getQueue(RequestsOut + 1) of
@ -88,12 +89,12 @@ handleMsg({timeout, TimerRef, waiting_over},
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
%% tcp tcp收到该次超时数据
gen_tcp:close(Socket),
self() ! ?miDoNetConnect,
{ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}};
handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1});
handleMsg({tcp_closed, Socket},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
CliState) ->
?WARN(ServerName, "connection closed~n", []),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed});
handleMsg({tcp_error, Socket, Reason},
#srvState{socket = Socket, serverName = ServerName} = SrvState,
@ -129,12 +130,129 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) ->
-spec terminate(term(), srvState(), cliState()) -> ok.
terminate(_Reason,
#srvState{timerRef = TimerRef} = SrvState,
_CliState = CliState) ->
#srvState{socket = Socket, timerRef = TimerRef} = SrvState,
#cliState{requestsOut = RequestsOut, status = Status} = CliState) ->
io:format("IMY*******************terminate ~p~n", [_Reason]),
{ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState),
gen_tcp:close(Socket),
agAgencyUtils:cancelTimer(TimerRef),
agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}),
agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}),
ok.
-spec overAllWork(srvState(), cliState()) -> ok.
overAllWork(#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket, timerRef = TimerRef} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState, status = Status} = CliState) ->
case Status of
leisure ->
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
ok;
MiRequest ->
overDealQueueRequest(MiRequest, SrvState, CliState)
end;
_ ->
overReceiveTcpData(SrvState, CliState)
end,
ok.
-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) ->
agAgencyUtils:delQueue(RequestsOut + 1),
case erlang:system_time(millisecond) > OverTime of
true ->
%%
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}),
case agAgencyUtils:getQueue(RequestsOut + 2) of
undefined ->
ok;
MiRequest ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}});
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}),
agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error})
end
end.
-spec overReceiveTcpData(srvState(), cliState()) -> term().
overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState,
#cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) ->
receive
{tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} ->
agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}),
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
%% todo bug fix
ok;
MiRequest ->
overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined})
end;
{ok, NewRecvState} ->
overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState});
{error, Reason} ->
?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}})
catch
E:R:S ->
?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledata_error}}),
ok
end;
{timeout, _TimerRef, waiting_over} ->
gen_tcp:close(Socket),
agAgencyUtils:agencyReply(CurInfo, {error, timeout}),
case agAgencyUtils:getQueue(RequestsOut + 1) of
undefined ->
ok;
MiRequest ->
case ?agBeamPool:get(PoolName) of
#dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} ->
case dealConnect(ServerName, HostName, Port, SocketOpts) of
{ok, NewSocket} ->
%% buff之类状态数据
NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined},
overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState);
{error, _Reason} ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {new_tcp_connect_error_over, _Reason}}),
ok
end;
_Ret ->
agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}}),
ok
end
end;
{tcp_closed, Socket} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}),
ok;
{tcp_error, Socket, Reason} ->
gen_tcp:close(Socket),
agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}),
ok
end.
-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}.
dealConnect(ServerName, HostName, Port, SocketOptions) ->
case inet:getaddrs(HostName, inet) of
{ok, IPList} ->
@ -151,9 +269,10 @@ dealConnect(ServerName, HostName, Port, SocketOptions) ->
{error, Reason}
end.
-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}.
dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem},
#srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState,
#cliState{requestsOut = RequestsOut} = CliState) ->
#cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) ->
agAgencyUtils:delQueue(RequestsOut + 1),
case erlang:system_time(millisecond) > OverTime of
true ->
@ -163,10 +282,10 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
undefined ->
{ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}};
MiRequest ->
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1})
dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1})
end;
_ ->
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]),
Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
case gen_tcp:send(Socket, Request) of
ok ->
TimerRef =
@ -176,7 +295,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
{ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}};
{error, Reason} ->
?WARN(ServerName, ":send error: ~p~n", [Reason]),
gen_tcp:close(Socket),

+ 1
- 1
src/httpCli/test.erl 查看文件

@ -5,7 +5,7 @@
start() ->
application:ensure_all_started(erlArango),
agHttpCli:startPool(tt, [{poolSize, 100}], []).
agHttpCli:startPool(tt, [{poolSize, 10}], []).
tt(C, N) ->
application:start(erlArango),

正在加载...
取消
保存