diff --git a/include/agHttpCli.hrl b/include/agHttpCli.hrl index c0efb73..4bba474 100644 --- a/include/agHttpCli.hrl +++ b/include/agHttpCli.hrl @@ -7,21 +7,13 @@ -define(DEFAULT_DBNAME, <<"_db/_system">>). -define(USER_PASSWORD, <<"root:156736">>). -define(DEFAULT_BACKLOG_SIZE, 1024). --define(DEFAULT_INIT_OPTS, undefined). -define(DEFAULT_CONNECT_TIMEOUT, 5000). -define(DEFAULT_POOL_SIZE, 16). --define(DEFAULT_POOL_STRATEGY, random). --define(DEFAULT_POOL_OPTIONS, []). -define(DEFAULT_IS_RECONNECT, true). --define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_RECONNECT_MIN, 500). +-define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_TIMEOUT, infinity). --define(DEFAULT_BODY, undefined). --define(DEFAULT_HEADERS, []). -define(DEFAULT_PID, self()). --define(DEFAULT_PROTOCOL, tcp). --define(DEFAULT_PORTO(Protocol), 8529). -%%-define(DEFAULT_PORTO(Protocol), case Protocol of tcp -> 80; _ -> 443 end). -define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {delay_send, true}, {nodelay, true}, {keepalive, true}, {recbuf, 1048576}, {send_timeout, 5000}, {send_timeout_close, true}]). -define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)). @@ -49,6 +41,7 @@ -record(requestRet, { statusCode :: undefined | 100..505, contentLength :: undefined | non_neg_integer() | chunked, + headers :: undefined | [binary()], body :: undefined | binary() }). @@ -67,25 +60,40 @@ current :: non_neg_integer() | undefined }). +-record(srvState, { + poolName :: poolName(), + serverName :: serverName(), + userPassWord :: binary(), + host :: binary(), + dbName :: binary(), + rn :: binary:cp(), + rnrn :: binary:cp(), + reconnectState :: undefined | reconnectState(), + socket :: undefined | ssl:sslsocket(), + timerRef :: undefined | reference() +}). + -record(cliState, { + isHeadMethod = false :: boolean(), %% 是否是<<"HEAD">>请求方法 + %method = undefined :: undefined | method(), requestsIn = 1 :: non_neg_integer(), requestsOut = 0 :: non_neg_integer(), - status = leisure :: waiting | leisure, backlogNum = 0 :: integer(), backlogSize = 0 :: integer(), + status = leisure :: waiting | leisure, curInfo = undefined :: tuple(), - recvState :: recvState() | undefined + recvState = undefined :: recvState() | undefined }). -record(dbOpts, { host :: host(), port :: 0..65535, - hostname :: string(), + hostname :: hostName(), dbName :: binary(), protocol :: protocol(), poolSize :: binary(), userPassword :: binary(), - socketOpts :: [gen_tcp:connect_option(), ...] + socketOpts :: socketOpts() }). -record(agencyOpts, { @@ -99,6 +107,7 @@ -type miAgHttpCliRet() :: #miAgHttpCliRet{}. -type requestRet() :: #requestRet{}. -type recvState() :: #recvState{}. +-type srvState() :: #srvState{}. -type cliState() :: #cliState{}. -type reconnectState() :: #reconnectState{}. @@ -111,26 +120,26 @@ -type body() :: iodata() | undefined. -type path() :: binary(). -type host() :: binary(). +-type hostName() :: string(). -type poolSize() :: pos_integer(). -type backlogSize() :: pos_integer() | infinity. -type requestId() :: {serverName(), reference()}. --type externalRequestId() :: term(). --type response() :: {externalRequestId(), term()}. -type socket() :: inet:socket() | ssl:sslsocket(). +-type socketOpts() :: [gen_tcp:connect_option(), ...]. -type error() :: {error, term()}. -type dbCfg() :: -{baseUrl, binary()} | -{dbName, binary()} | -{userPassword, binary()} | -{poolSize, poolSize()} | -{socketOpts, [gen_tcp:connect_option(), ...]}. + {baseUrl, binary()} | + {dbName, binary()} | + {userPassword, binary()} | + {poolSize, poolSize()} | + {socketOpts, [gen_tcp:connect_option(), ...]}. -type agencyCfg() :: -{reconnect, boolean()} | -{backlogSize, backlogSize()} | -{reconnectTimeMin, pos_integer()} | -{reconnectTimeMax, pos_integer()}. + {reconnect, boolean()} | + {backlogSize, backlogSize()} | + {reconnectTimeMin, pos_integer()} | + {reconnectTimeMax, pos_integer()}. -type dbCfgs() :: [dbCfg()]. -type dbOpts() :: #dbOpts{}. diff --git a/include/erlArango.hrl b/include/erlArango.hrl index c75d21a..a590841 100644 --- a/include/erlArango.hrl +++ b/include/erlArango.hrl @@ -4,4 +4,5 @@ -define(Get, <<"GET">>). -define(Put, <<"PUT">>). -define(Post, <<"POST">>). +-define(Head, <<"HEAD">>). -define(Delete, <<"DELETE">>). \ No newline at end of file diff --git a/src/arangoApi/agCollections.erl b/src/arangoApi/agCollections.erl index 88d0bae..9296aa5 100644 --- a/src/arangoApi/agCollections.erl +++ b/src/arangoApi/agCollections.erl @@ -3,220 +3,378 @@ -compile([export_all, nowarn_export_all]). +% doc_address:https://www.arangodb.com/docs/stable/http/collection.html + % 创建一个集合 % POST /_api/collection % 查询参数 -% waitForSyncReplication(可选):默认为1,这意味着如果所有副本都创建了集合,则服务器将仅向客户端报告成功。如果您想要更快的服务器响应并且不关心完全复制,则设置为0。 -% forceReplicationFactor(可选):默认为1,这意味着服务器将在创建时检查是否有足够的副本,否则将进行紧急救助。设置为0可禁用此额外检查。 -% -% 具有以下属性的JSON对象是必需的: -% name:集合的名称。 -% waitForSync:如果为true,则在从文档创建,更新,替换或删除操作返回之前,将数据同步到磁盘。(默认值:false) -% doCompact:是否压缩集合(默认为true),此选项仅对MMFiles存储引擎有意义。 -% journalSize:日志或数据文件的最大大小,以字节为单位。该值必须至少为1048576(1 MiB)。(默认值为配置参数)此选项仅对MMFiles存储引擎有意义。 -% isSystem:如果为true,则创建一个系统集合。在这种情况下,collection-name 应该以下划线开头。最终用户通常应仅创建非系统集合。在非常特殊的情况下,可能需要API实现者来创建系统集合,但通常情况下会使用常规集合。(默认为false) -% isVolatile:如果为true,则收集数据仅保留在内存中,而不是持久的。卸载集合将导致集合数据被丢弃。停止或重新启动服务器也将导致集合中的数据完全丢失。 -% 设置此选项将使生成的集合比常规集合快一点,因为ArangoDB不会对磁盘​​执行任何同步,也不会为数据文件计算任何CRC校验和(因为没有数据文件)。 -% 因此,此选项应仅用于高速缓存类型的集合,而不能用于否则无法重新创建的数据。(默认值为false)此选项仅对MMFiles存储引擎有意义。 -% keyOptions:密钥生成的其他选项。如果指定,则keyOptions 应该是包含以下属性的JSON数组: -% type:指定密钥生成器的类型。当前可用的生成器是传统生成器,自动递增生成器,uuid和填充生成器 。 -% 在传统的密钥生成器生成升序数字键。在自动增量密钥发生器生成升序数字键,所述inital偏移和间隔可以被配置成在填充密钥发生器以上升辞书排序顺序生成的固定长度(16个字节)的密钥。 -% 这是与RocksDB 引擎配合使用的理想选择,该引擎将稍微有利于按字典顺序升序插入的键。密钥生成器可以在单服务器或群集中使用。该UUID密钥生成器生成通用的唯一128位密钥, -% 这些密钥以十六进制人类可读格式存储。此密钥生成器可用于单服务器或群集中,以生成“看似随机的”密钥。此密钥生成器产生的密钥未按字典顺序排序。 -% allowUserKeys:如果设置为true,则允许在文档的_key属性中提供自己的键值 。如果设置为false,那么密钥生成器将仅负责生成密钥,并且在文档的_key属性中提供自己的密钥值被视为错误。 -% 增量:自动增量密钥生成器的增量值。不适用于其他密钥生成器类型。 -% offset:自动增量密钥生成器的初始偏移值。不适用于其他密钥生成器类型。 -% type:(默认值为2):要创建的集合的类型。类型的以下值有效: -% 2:文件收集 -% 3:边缘收集 -% indexBuckets:使用哈希表将索引拆分为的存储桶数。缺省值为16,并且此数字必须为2的幂且小于或等于1024。 -% 对于非常大的集合,应该增加此值,以避免在必须先构建或调整哈希表时产生长时间的停顿,因为存储桶是单独调整大小的,可以初始并行构建。 -% 例如,对于具有1亿个文档的集合,64可能是一个明智的值。当前,只有边缘索引才尊重该值,但是其他索引类型可能会在将来的ArangoDB版本中使用。下次加载集合时,将应用更改(请参见下文)。此选项仅对MMFiles存储引擎有意义。 -% numberOfShards:(默认值为1):在集群中,此值确定要为集合创建的分片数。在单服务器设置中,此选项没有意义。 -% shardKeys:(默认值为[“ _key”]):在集群中,此属性确定用于确定文档目标分片的文档属性。文档根据其分片键属性的值发送到分片。对文档中所有分片键属性的值进行哈希处理,然后使用哈希值确定目标分片。 -% 注意:分键属性值一旦设置就无法更改。在单个服务器设置中,此选项没有意义。 -% plicationFactor:(默认值为1):在集群中,此属性确定每个分片在不同的DBServer上保留多少个副本。值1表示仅保留一个副本(无同步复制)。k的值表示保留k-1个副本。任何两个副本驻留在不同的DBServer上。 -% 它们之间的复制是同步的,也就是说,在报告写入操作成功之前,对“ leader”副本的每个写入操作都会复制到所有“ follower”副本。 -% 如果服务器发生故障,则会自动检测到故障,并且其中一台拥有副本的服务器将接管业务,通常不会报告错误。 -% DistributionShardsLike:(默认值为“”):在企业版集群中,此属性将新创建的集合的分片详细信息绑定到指定的现有集合中。 注意:使用此参数会对原型集合产生影响。 -% 在删除分片模仿集合之前,不能再删除它。同样,仅模拟集合的备份和还原将生成有关丢失分片原型的警告(可以被覆盖)。 -% shardingStrategy:此属性指定用于集合的分片策略的名称。从ArangoDB 3.4开始,创建新集合时可以选择不同的分片策略。所选的shardingStrategy 值将对集合保持固定,此后无法更改。 -% 这对于使集合保持其分片设置并始终使用相同的初始分片算法查找已分发到分片的文档非常重要。 -% 可用的分片策略为: -% community-compat:版本3.4之前的ArangoDB社区版使用的默认分片 -% enterprise-compat:版本3.4之前的ArangoDB企业版使用的默认分片 -% enterprise-smart-edge-compat:版本3.4之前的ArangoDB Enterprise Edition中的智能边缘集合使用的默认分片 -% hash:从版本3.4开始用于新集合的默认分片(不包括智能边缘集合) -% enterprise-hash-smart-edge:从版本3.4开始,用于新智能边缘集合的默认分片 -% 如果未指定分片策略,则所有集合的默认值将为哈希,所有智能边缘集合的默认值将为enterprise-hash-smart-edge(需要ArangoDB 企业版)。手动覆盖分片策略尚不能提供好处,但是稍后可能会添加其他分片策略。 +% waitForSyncReplication(可选):默认为1,这意味着如果所有副本都创建了集合,则服务器将仅向客户端报告成功。 +% 如果您想要更快的服务器响应并且不关心完全复制,则设置为0。 +% forceReplicationFactor(可选):默认值为1,这意味着服务器将在创建时检查是否有足够的副本,否则将进行紧急救助。 +% 设置为0可禁用此额外检查。 % -% smartJoinAttribute:在Enterprise Edition集群中,此属性确定集合的属性,该属性必须包含所引用的智能联接集合的分片键值。此外,此集合中文档的分片键必须包含此属性的值,后跟冒号,然后是文档的实际主键。 -% 此功能只能在企业版中使用,并且需要将集合的 distributedShardsLike属性设置为另一个集合的名称。它还要求将集合的shardKeys属性设置为单个shard key属性,并在末尾添加一个附加的“:”。进一步的限制是,无论何时在集合中存储或更新文档,smartJoinAttribute中存储的值都必须是字符串。 -% -% 用给定名称创建一个新集合。该请求必须包含具有以下属性的对象。 -% 400:如果缺少集合名称,则返回HTTP 400。 -% 404:如果集合名称未知,则返回HTTP 404。 -newColl(PoolName, Param) -> - BodyStr = jiffy:encode(Param), - agHttpCli:callAgency(PoolName, ?Post, <<"/_api/collection">>, [], BodyStr, infinity). - -% 删除收藏 +% 具有以下属性的JSON对象是必需的: +% name:集合的名称。 +% waitForSync:如果为true,则在从文档创建,更新,替换或删除操作返回之前,将数据同步到磁盘。(默认值:false) +% doCompact:是否压缩集合(默认为true),此选项仅对MMFiles存储引擎有意义。 +% journalSize:日志或数据文件的最大大小,以字节为单位。该值必须至少为1048576(1 MiB)。 +% (默认值为配置参数)此选项仅对MMFiles存储引擎有意义。 +% isSystem:如果为true,则创建一个系统集合。在这种情况下,collection-name 应该以下划线开头。 +% 最终用户通常应仅创建非系统集合。在非常特殊的情况下,可能需要API实现者来创建系统集合,但通常会使用常规集合。(默认为false) +% isVolatile:如果为true,则收集数据仅保留在内存中,而不是持久的。卸载集合将导致集合数据被丢弃。 +% 停止或重新启动服务器也将导致集合中的数据完全丢失。设置此选项将使结果集合比常规集合快一点, +% 因为ArangoDB不会对磁盘​​执行任何同步,也不会为数据文件计算任何CRC校验和(因为没有数据文件)。 +% 因此,此选项应仅用于高速缓存类型的集合,而不应用于无法通过其他方式重新创建的数据。 +% (默认值为false)此选项仅对MMFiles存储引擎有意义。 +% keyOptions:密钥生成的其他选项。如果指定,则keyOptions 应该是包含以下属性的JSON数组: +% type:指定密钥生成器的类型。当前可用的生成器是 传统的,自动递增的,uuid的和填充的。 +% 在传统的密钥生成器生成升序数字键。在自动增量密钥发生器生成升序数字键, +% 所述inital偏移和间隔可以被配置成在填充密钥发生器以上升辞书排序顺序生成的固定长度(16个字节)的密钥。 +% 这是与RocksDB配合使用的理想选择 引擎,这将稍微有利于按字典顺序升序插入的键。 +% 密钥生成器可以在单服务器或群集中使用。的UUID密钥生成器生成通用唯一的128位密钥, +% 这被存储在十六进制人类可读的格式。该密钥生成器可用于单服务器或群集中,以生成“看似随机的”密钥。 +% 此密钥生成器生成的密钥未按字典顺序排序。 +% allowUserKeys:如果设置为true,则允许在文档的_key属性中提供自己的键值 。如果设置为false, +% 那么密钥生成器将仅负责生成密钥,并且在文档的_key属性中提供自己的密钥值被视为错误。 +% incremen:自动增量密钥生成器的增量值。不适用于其他密钥生成器类型。 +% offset:自动增量密钥生成器的初始偏移值。不适用于其他密钥生成器类型。 +% type:(默认值为2):要创建的集合的类型。以下type值有效: +% 2:文件收集 +% 3:边缘收集 +% indexBuckets:使用哈希表将索引拆分成的存储桶数。缺省值为16,并且此数字必须为2的幂且小于或等于1024。 +% 对于非常大的集合,应该增加此值,以避免必须先构建或调整哈希表时出现长时间的停顿,因为存储桶是分别调整大小的, +% 可以初始并行构建。例如,对于具有1亿个文档的集合,64可能是一个明智的值。当前,只有边缘索引才尊重该值, +% 但其他索引类型可能会在将来的ArangoDB版本中使用。下次加载集合时,将应用更改(请参见下文)。 +% 此选项仅对MMFiles存储引擎有意):在集群中,此值确定要为集合创建的分片数。在单服务器设置中,此选项没有意义。 +% shardKeys:(默认值为[“ _key”]):在集群中,此属性确定用于确定文档目标分片的文档属性。 +% 文档根据其分片键属性的值发送到分片。对文档中所有分片键属性的值进行哈希处理,并将哈希值用于确定目标分片。 +% 注意:分片键属性的值一旦设置就无法更改。在单个服务器设置中,此选项没有意义。 +% plicationFactor:(默认值为1):在集群中,此属性确定每个分片在不同的DBServer上保留多少个副本。 +% 值1表示仅保留一个副本(无同步复制)。k的值表示保留k-1个副本。任何两个副本驻留在不同的DBServer上。 +% 它们之间的复制是同步的,也就是说,在报告写入操作成功之前,对“ leader”副本的每个写入操作都会复制到所有“ follower”副本。 +% 如果服务器发生故障,则会自动检测到故障,并且其中一台拥有副本的服务器将接管业务,通常不会报告错误。 +% writeConcern:为此集合写关注点(默认值:1)。它确定在不同的DBServer上同步每个分片需要多少个副本。 +% 如果集群中的副本数量很少,那么分片将拒绝写入。但是,具有足够最新副本的分片写入将同时成功。writeConcern的值 不能大于ReplicationFactor。(仅集群) +% DistributionShardsLike:(默认值为“”):在企业版集群中,此属性将新创建的集合的分片详细信息绑定到指定的现有集合中。 +% 注意:使用此参数会对原型集合产生影响。在删除分片模仿集合之前,不能再删除它。同样,仅模拟集合的备份和还原将生成有关丢失分片原型的警告(可以被覆盖)。 +% shardingStrategy:此属性指定用于集合的分片策略的名称。从ArangoDB 3.4开始,创建新集合时可以选择不同的分片策略。 +% 所选的shardingStrategy 值对于集合将保持固定,此后无法更改。这对于使集合保持其分片设置并始终使用相同的 +% 初始分片算法查找已分发到分片的文档非常重要。 +% 可用的分片策略为: +% community-compat:版本3.4之前的ArangoDB社区版使用的默认分片 +% enterprise-compat:版本3.4之前的ArangoDB企业版使用的默认分片 +% enterprise-smart-edge-compat:版本3.4之前的ArangoDB Enterprise Edition中的智能边缘集合使用的默认分片 +% hash:从3.4版开始用于新集合的默认分片(不包括智能边缘集合) +% enterprise-hash-smart-edge:从版本3.4开始,用于新的智能边缘集合的默认分片 +% 如果未指定分片策略,则所有集合的默认值将为哈希,所有智能边缘集合的默认值将为enterprise-hash-smart-edge(需要ArangoDB 企业版)。手动覆盖分片策略尚不能提供好处,但是稍后可能会添加其他分片策略。 +% smartJoinAttribute:在企业版集群中,此属性确定集合的属性,该属性必须包含引用的智能联接集合的分片键值。此外,此集合中文档的分片键必须包含此属性的值,后跟冒号,然后是文档的实际主键。 +% 此功能只能在企业版中使用,并且需要将集合的 distributedShardsLike属性设置为另一个集合的名称。它还要求将集合的shardKeys属性设置为单个shard key属性,并在末尾添加一个附加的“:”。 +% 进一步的限制是,无论何时在集合中存储或更新文档,smartJoinAttribute中存储的值都必须是字符串。 +% 用给定名称创建一个新集合。该请求必须包含具有以下属性的对象。 +% 400:如果缺少集合名称,则返回HTTP 400。 +% 404:如果集合名称未知,则返回HTTP 404。 +newColl(PoolNameOrSocket, Args) -> + BodyStr = jiffy:encode(Args), + agHttpCli:callAgency(PoolNameOrSocket, ?Post, <<"/_api/collection">>, [], BodyStr). + +newColl(PoolNameOrSocket, Args, WaitForSyncReplication, ForceReplicationFactor) -> + BodyStr = jiffy:encode(Args), + Path = <<"/_api/collection?waitForSyncReplication=", (erlang:integer_to_binary(WaitForSyncReplication))/binary, "&forceReplicationFactor=", (erlang:integer_to_binary(ForceReplicationFactor))/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Post, Path, [], BodyStr). + +% 删除集合 % DELETE /_api/collection/{collection-name} -delColl(PoolName, CoolName, IsSystem) -> +delColl(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Delete, Path, [], undefined). + +delColl(PoolNameOrSocket, CollName, IsSystem) -> case IsSystem of true -> - Path = <<"/_api/collection/", CoolName/binary, "?isSystem=true">>, - agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity); + Path = <<"/_api/collection/", CollName/binary, "?isSystem=true">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Delete, Path, [], undefined); _ -> - Path = <<"/_api/collection/", CoolName/binary>>, - agHttpCli:callAgency(PoolName, ?Delete, Path, [], undefined, infinity) + Path = <<"/_api/collection/", CollName/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Delete, Path, [], undefined) end. -%% 截断集合 -%% PUT /_api/collection/{collection-name}/truncate -clearColl(PoolName, CoolName) -> - Path = <<"/_api/collection/", CoolName/binary, "/truncate">>, - agHttpCli:callAgency(PoolName, ?Put, Path, [], undefined, infinity). - -%% 返回有关集合的信息 -%% GET /_api/collection/{collection-name} -%% 结果是一个对象,该对象描述具有以下属性的集合: -%% id:集合的标识符。 -%% name:集合的名称。 -%% status:集合状态为数字。 -%% 1:新出生的收藏 -%% 2:已卸载 -%% 3:已加载 -%% 4:在卸载过程中 -%% 5:已删除 -%% 6:加载 -%% 其他每个状态都表示集合已损坏。 -%% type:集合的类型,以数字表示。 -%% 2:文件收集(正常情况) -%% 3:边缘收集 -%% isSystem:如果为true,则该集合为系统集合。 -collInfo(PoolName, CoolName) -> - Path = <<"/_api/collection/", CoolName/binary>>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity). - -%% 阅读集合的属性 -%% 读取指定集合的​​属性 -%% GET /_api/collection/{collection-name}/properties -collProperties(PoolName, CoolName) -> - Path = <<"/_api/collection/", (CoolName)/binary, "/properties">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity). - -%% 返回集合中的文档数量 -%% 计算集合中的文档 -%% 请注意,这将始终将集合加载到内存中。 -%% GET /_api/collection/{collection-name}/count -collCount(PoolName, CoolName) -> - Path = <<"/_api/collection/", CoolName/binary, "/count">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity). - -%% 返回统计信息收集 -%% 获取集合的统计信息 -%% GET /_api/collection/{collection-name}/figures -%% 除上述内容外,结果还包含文档数量和有关集合的其他统计信息。 注意:这将始终将集合加载到内存中 -collFigures(PoolName, CoolName) -> - Path = <<"/_api/collection/", CoolName/binary, "/figures">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity). - -%% 返回负责文档的分片 -%% 退还负责文件的分片 -%% PUT /_api/collection/{collection-name}/responsibleShard -%% 路径参数 -%% collection-name(必填):集合的名称。 -%% 请求正文(json) -%% 主体必须包含一个JSON对象,且至少将分片键属性设置为某些值。 -%% 返回负责给定文档(如果存在)或如果存在该文档将负责的分片的ID。 -%% 该请求必须正文必须包含一个JSON文档,其中至少将集合的分片键属性设置为某些值。 -%% 响应是一个具有shardId属性的JSON对象,其中将包含负责的分片的ID。 -%% 注意:此方法仅在群集协调器中可用。 -collResponsibleShard(PoolName, CoolName, Param) -> - Path = <<"/_api/collection/", CoolName/binary, "/responsibleShard">>, - BodyStr = jiffy:encode(Param), - agHttpCli:callAgency(PoolName, ?Get, Path, [], BodyStr, infinity). - -%% 返回集合永久链接的分片 ID -%% 返回集合的分片ID -%% 默认情况下,返回带有集合的分片ID的JSON数组。 -%% 如果details参数设置为true,它将返回一个以分片ID作为对象属性键的JSON对象,并将每个分片的负责服务器映射到它们。在详细的响应中,领导者碎片将排在阵列的首位。 -%% 注意:此方法仅在群集协调器中可用。 -%% GET /_api/collection/{collection-name}/shards -%% Query Parameters -%% details (optional):如果设置为true,则返回值还将包含集合碎片的负责服务器。 -collShards(PoolName, CoolName, IsDetails) -> +% 截断集合 +% PUT /_api/collection/{collection-name}/truncate +clearColl(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/truncate">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined). + +% 返回有关集合的信息 +% GET /_api/collection/{collection-name} +% 结果是一个对象,该对象描述具有以下属性的集合: +% id:集合的标识符。 +% name:集合的名称。 +% status:集合状态为数字。 +% 1:新出生的收藏 +% 2:已卸载 +% 3:已加载 +% 4:在卸载过程中 +% 5:已删除 +% 6:加载 +% 其他每个状态都表示集合已损坏。 +% type:集合的类型,以数字表示。 +% 2:文件收集(正常情况) +% 3:边缘收集 +% isSystem:如果为true,则该集合为系统集合。 +% 返回码 +% +% 404:如果集合名称未知,则返回HTTP 404。 +collInfo(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +% 读取指定集合的属性 +% GET /_api/collection/{collection-name}/properties +collProperties(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", (CollName)/binary, "/properties">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +% 返回集合中的文档数量 +% 计算集合中的文档 +% 请注意,这将始终将集合加载到内存中。 +% GET /_api/collection/{collection-name}/count +collCount(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/count">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +% 获取集合的统计信息 +% GET /_api/collection/{collection-name}/figures +% 除上述内容外,结果还包含文档数量和有关集合的其他统计信息。 注意:这将始终将集合加载到内存中 +% 注意:仅存储在预写日志中的收集数据不会在结果中报告。收集预写日志后,可能会将文档添加到馆藏的日志和数据文件中, +% 这可能会修改馆藏的数字。 +% 此外,不报告集合和索引参数JSON文件的文件大小。这些文件的大小通常应为几个字节。另请注意,fileSize值以字节为单位报告, +% 并反映逻辑文件的大小。某些文件系统可能会使用优化(例如,稀疏文件),因此实际物理文件大小会有所不同。目录和子目录可能 +% 还需要文件系统中的空间,但是该空间不会在fileSize结果中报告。 +% 这意味着报告的数字不能准确性为100%反映集合的实际磁盘使用情况。集合的实际磁盘使用率通常略高于报告的fileSize值的总和 。 +% 仍然可以将fileSize值的总和用作磁盘使用率的下限近似值。 +collFigures(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/figures">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +% 返回负责文档的分片 +% PUT /_api/collection/{collection-name}/responsibleShard +% 路径参数 +% collection-name(必填):集合的名称。 +% 请求正文(json) +% 主体必须包含一个JSON对象,且至少将分片键属性设置为某些值。 +% 返回负责给定文档(如果存在)或如果存在该文档将负责的分片的ID。 +% 该请求必须正文必须包含一个JSON文档,其中至少将集合的分片键属性设置为某些值。 +% 响应是一个具有shardId属性的JSON对象,其中将包含负责的分片的ID。 +% 注意:此方法仅在群集协调器中可用。 +% eg: args = #{'_key' => testkey, value => 23} + +collResponsibleShard(PoolNameOrSocket, CollName, Args) -> + Path = <<"/_api/collection/", CollName/binary, "/responsibleShard">>, + BodyStr = jiffy:encode(Args), + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], BodyStr). + +% 返回集合永久链接的分片 ID +% 返回集合的分片ID +% 默认情况下,返回带有集合的分片ID的JSON数组。 +% 如果details参数设置为true,它将返回一个以分片ID作为对象属性键的JSON对象,并将每个分片的负责服务器映射到它们。在详细的响应中,领导者碎片将排在阵列的首位。 +% 注意:此方法仅在群集协调器中可用。 +% GET /_api/collection/{collection-name}/shards +% Query Parameters +% details (optional):如果设置为true,则返回值还将包含集合碎片的负责服务器。 +collShards(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/shards">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +collShards(PoolNameOrSocket, CollName, IsDetails) -> case IsDetails of true -> - Path = <<"/_api/collection/", CoolName/binary, "/shards?details=true">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity); + Path = <<"/_api/collection/", CollName/binary, "/shards?details=true">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined); _ -> - Path = <<"/_api/collection/", CoolName/binary, "/shards">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity) + Path = <<"/_api/collection/", CollName/binary, "/shards">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined) end. -%% 返回集合修订版ID 永久链接 -%% 检索馆藏修订ID -%% 除上述内容外,结果还将包含集合的修订版ID。修订ID是服务器生成的字符串,客户端可以使用该字符串检查自上次修订检查以来集合中的数据是否已更改。 -%% 版本:集合修订版本ID作为字符串。 -%% GET /_api/collection/{collection-name}/revision -collRevision(PoolName, CoolName) -> - Path = <<"/_api/collection/", CoolName/binary, "/revision">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity). - -%% 返回集合永久链接的校验和 -%% 返回指定集合的​​校验和 -%% 查询参数 -%% withRevisions(可选):是否在校验和计算中包括文档修订版ID。 -%% withData(可选):是否在校验和计算中包括文档主体数据。 -%% 将计算集合中元数据(键和可选的修订ID)以及文档数据的校验和。 -%% 校验和可用于比较不同ArangoDB实例上的两个集合是否包含相同的内容。集合的当前修订版也会返回,因此可以确保针对相同数据状态计算校验和。 -%% 默认情况下,校验和将仅根据集合中包含的文档的_key系统属性来计算。对于边缘集合,系统属性_from和_to也将包含在计算中。 -%% 通过将可选查询参数withRevisions设置为true,则校验和中包括修订版ID(_rev系统属性)。 -%% 通过为可选查询参数withData提供值为true的值,用户定义的文档属性也将包括在计算中。 注意:包括用户定义的属性将使校验和变慢。 -%% 响应是具有以下属性的JSON对象: -%% checksum:计算得出的校验和(以数字形式)。 -%% 版本:集合修订版本ID作为字符串。 -%% 注意:此方法在群集中不可用。 -%% GET /_api/collection/{collection-name}/checksum -collChecksum(PoolName, CoolName, IsWithRevisions, IsWithData) -> +% 返回集合修订版ID +% 除上述内容外,结果还将包含集合的修订版ID。修订ID是服务器生成的字符串,客户端可以使用该字符串检查自上次修订检查以来集合中的数据是否已更改。 +% 版本:集合修订版本ID作为字符串。 +% GET /_api/collection/{collection-name}/revision +collRevision(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/revision">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + +% 返回集合永久链接的校验和 +% 返回指定集合的​​校验和 +% 查询参数 +% withRevisions(可选):是否在校验和计算中包括文档修订版ID。 +% withData(可选):是否在校验和计算中包括文档主体数据。 +% 将计算集合中元数据(键和可选的修订ID)以及文档数据的校验和。 +% 校验和可用于比较不同ArangoDB实例上的两个集合是否包含相同的内容。集合的当前修订版也会返回,因此可以确保针对相同数据状态计算校验和。 +% 默认情况下,校验和将仅根据集合中包含的文档的_key系统属性来计算。对于边缘集合,系统属性_from和_to也将包含在计算中。 +% 通过将可选查询参数withRevisions设置为true,则校验和中包括修订版ID(_rev系统属性)。 +% 通过为可选查询参数withData提供值为true的值,用户定义的文档属性也将包括在计算中。 注意:包括用户定义的属性将使校验和变慢。 +% 响应是具有以下属性的JSON对象: +% checksum:计算得出的校验和(以数字形式)。 +% 版本:集合修订版本ID作为字符串。 +% 注意:此方法在群集中不可用。 +% GET /_api/collection/{collection-name}/checksum +collChecksum(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/checksum">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined). + + +collChecksum(PoolNameOrSocket, CollName, IsWithRevisions, IsWithData) -> case IsWithRevisions orelse IsWithData of false -> - Path = <<"/_api/collection/", CoolName/binary, "/checksum">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity); + Path = <<"/_api/collection/", CollName/binary, "/checksum">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined); _ -> - Path = <<"/_api/collection/", CoolName/binary, "/checksum?withRevisions=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary, "&withData=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary>>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity) + Path = <<"/_api/collection/", CollName/binary, "/checksum?withRevisions=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary, "&withData=", (erlang:atom_to_binary(IsWithRevisions, utf8))/binary>>, + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined) end. -%% 读取所有收藏夹 -%% 返回所有集合 -%% 查询参数 -%% -%%excludeSystem(可选):是否应从结果中排除系统集合。 -%% GET /_api/collection -collList(PoolName, CoolName, IsExcludeSystem) -> +% 返回所有集合 +% 查询参数 +% excludeSystem(可选):是否应从结果中排除系统集合。 +% GET /_api/collection +collList(PoolNameOrSocket) -> + agHttpCli:callAgency(PoolNameOrSocket, ?Get, <<"/_api/collection">>, [], undefined). + +collList(PoolNameOrSocket, IsExcludeSystem) -> case IsExcludeSystem of false -> - Path = <<"/_api/collection/", CoolName/binary, "/checksum">>, - agHttpCli:callAgency(PoolName, ?Get, <<"/_api/collection">>, [], undefined, infinity); + agHttpCli:callAgency(PoolNameOrSocket, ?Get, <<"/_api/collection">>, [], undefined); _ -> Path = <<"/_api/collection/?excludeSystem=true">>, - agHttpCli:callAgency(PoolName, ?Get, Path, [], undefined, infinity) + agHttpCli:callAgency(PoolNameOrSocket, ?Get, Path, [], undefined) end. -%% 加载收藏 -%% 请求主体对象可以选择包含以下属性: -%% count:如果设置,则控制返回值是否应包括集合中的文档数。将count设置为 false可以加快加载集合的速度。为默认值 数为真。 -%% PUT /_api/collection/{collection-name}/load -collLoad(PoolName, CoolName, IsCount) -> +% 加载集合 +% 请求主体对象可以选择包含以下属性: +% count:如果设置,则控制返回值是否应包括集合中的文档数。将count设置为 false可以加快加载集合的速度。为默认值 数为true。 +% PUT /_api/collection/{collection-name}/load +collLoad(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/load">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined). + + +collLoad(PoolNameOrSocket, CollName, IsCount) -> case IsCount of false -> - Path = <<"/_api/collection/", CoolName/binary, "/load">>, - agHttpCli:callAgency(PoolName, ?Put, Path, [], <<"{\"count\":false}">>, infinity); + Path = <<"/_api/collection/", CollName/binary, "/load">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], <<"{\"count\":false}">>); _ -> - Path = <<"/_api/collection/", CoolName/binary, "/load">>, - agHttpCli:callAgency(PoolName, ?Put, Path, [], undefined, infinity) + Path = <<"/_api/collection/", CollName/binary, "/load">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined) end. +% 卸载集合 +% PUT /_api/collection/{collection-name}/unload +% 路径参数 +% collection-name(必需):从内存中删除一个集合。此调用不会删除任何文档。您可以随后使用该集合;在这种情况下,它将再次加载到内存中。成功后,将返回具有以下属性的对象: +% id:集合的标识符。 +% name:集合的名称。 +% status:集合状态为数字。 +% type:集合类型。有效类型为: +% 2:文件收集 +% 3:边缘收集 +% isSystem:如果为true,则该集合为系统集合。 + +collUnload(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/unload">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined). + +% 将索引加载到内存中 +% PUT /_api/collection/{collection-name}/loadIndexesIntoMemory +% 路径参数 +% collection-name(必填):此路由尝试将这个collection的所有索引条目缓存到主内存中。 +% 因此,它将遍历集合的所有索引,并将索引值而不是整个文档数据存储在内存中。 +% 可以在缓存中找到的所有查找都比未存储在缓存中的查找要快得多,因此可以提高性能。 +% 还可以保证缓存与存储的数据一致。 +% 目前,此功能仅在RocksDB存储引擎上有用,因为在MMFiles引擎中,所有索引仍然在内存中。 +% 在RocksDB上,此函数将遵守所有内存限制,如果要加载的索引小于您的内存限制,则此函数可确保 +% 大多数索引值都已缓存。如果索引大于您的内存限制,则此函数将填满直至达到此限制的值, +% 并且暂时无法控制集合中的哪些索引应优先于其他索引。 + +collLoadIndexesIntoMemory(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/loadIndexesIntoMemory">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined). + +% 更改集合的属性固定链接 +% PUT /_api/collection/{collection-name}/properties +% 路径参数 +% collection-name(必填):集合的名称。 +% 更改集合的属性。需要具有属性的对象 +% waitForSync:如果为true,则创建或更改文档将等待,直到数据已同步到磁盘。 +% journalSize:日志或数据文件的最大大小,以字节为单位。该值必须至少为1048576(1 MB)。请注意,更改journalSize值时,它仅对创建的其他日志或数据文件有效。现有的日志或数据文件将不受影响。 +% 成功后,将返回具有以下属性的对象: +% id:集合的标识符。 +% name:集合的名称。 +% waitForSync:新值。 +% journalSize:新值。 +% status:集合状态为数字。 +% type:集合类型。有效类型为: +% 2:文件收集 +% 3:边缘收集 +% isSystem:如果为true,则该集合为系统集合。 +% isVolatile:如果为true,则收集数据将仅保留在内存中,并且ArangoDB不会将数据写入或同步到磁盘。 +% doCompact:是否将压缩集合。 +% keyOptions:JSON对象,其中包含密钥生成选项: +% type:指定密钥生成器的类型。当前可用的生成器是传统的,自动递增的,uuid的 和填充的。 +% allowUserKeys:如果设置为true,则允许在文档的_key属性中提供自己的键值。如果设置为 false,那么密钥生成器将独自负责生成密钥,并且在文档的_key属性中提供自己的密钥值被视为错误。 +% 注意:除了waitForSync,journalSize和name之外,创建集合后就无法更改集合属性。要重命名集合,必须使用重命名端点。 + +collChangeProperties(PoolNameOrSocket, CollName, Args) -> + Path = <<"/_api/collection/", CollName/binary, "/properties">>, + BodyStr = jiffy:encode(Args), + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], BodyStr). + +% 重命名集合 +% PUT /_api/collection/{collection-name}/rename +% 路径参数 +% collection-name(必填):要重命名的集合的名称。 +% 重命名集合。需要具有属性的对象 +% name:新名称。 +% 它返回具有属性的对象 +% id:集合的标识符。 +% name:集合的新名称。 +% status:集合状态为数字。 +% type:集合类型。有效类型为: +% 2:文件收集 +% 3:边缘收集 +% isSystem:如果为true,则该集合为系统集合。 +% 如果重命名集合成功,那么该集合还将_graphs在当前数据库中该集合内的所有图形定义中重命名。 +% 注意:此方法在群集中不可用。 +collRename(PoolNameOrSocket, OldName, NewName) -> + Path = <<"/_api/collection/", OldName/binary, "/rename">>, + NameStr = jiffy:encode(NewName), + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], <<"{\"name\":", NameStr/binary, "}">>). + +% 旋转收藏夹的日记 +% PUT /_api/collection/{collection-name}/rotate +% 路径参数 +% collection-name(必填):集合的名称。 +% 旋转集合的日记帐。集合的当前日志将关闭,并成为只读数据文件。Rotate方法的目的是使文件中的数据可用于压缩(压缩仅对只读数据文件而不对日志执行)。 +% 如果没有当前日志,随后将新数据保存在集合中将自动创建一个新的日志文件。 +% 它返回具有属性的对象 +% 结果:如果旋转成功,则为true +% 注意:此方法特定于MMFiles存储引擎,并且在群集中不可用。 +% 返回码 +% 400:如果该集合当前没有日记,则返回HTTP 400。 +% 404:如果集合名称未知,则返回HTTP 404。 +collRotate(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/rotate">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined). + + +% 重新计算集合的文档数 +% PUT /_api/collection/{collection-name}/recalculateCount +% 路径参数 +% collection-name(必填):集合的名称。 +% 重新计算集合的文档计数(如果不一致)。 +% 它返回具有属性的对象 +% 结果:如果重新计算文档计数成功,则为true。 +% 注意:此方法特定于RocksDB存储引擎 +collRecalculateCount(PoolNameOrSocket, CollName) -> + Path = <<"/_api/collection/", CollName/binary, "/recalculateCount">>, + agHttpCli:callAgency(PoolNameOrSocket, ?Put, Path, [], undefined). \ No newline at end of file diff --git a/src/httpCli/agAgencyPoolMgrIns.erl b/src/httpCli/agAgencyPoolMgrIns.erl index 21e20f7..f2551b6 100644 --- a/src/httpCli/agAgencyPoolMgrIns.erl +++ b/src/httpCli/agAgencyPoolMgrIns.erl @@ -43,6 +43,11 @@ handleMsg(_Msg, State) -> {ok, State}. terminate(_Reason, _State) -> + io:format("IMY******************* agAgencyPoolMgrIns terminate ~p~n ", [_Reason]), + ets:delete_all_objects(?ETS_AG_Pool), + ets:delete_all_objects(?ETS_AG_Agency), + agKvsToBeam:load(?agBeamPool, []), + agKvsToBeam:load(?agBeamAgency, []), ok. -spec startPool(poolName(), dbCfgs()) -> ok | {error, pool_name_used}. @@ -152,6 +157,7 @@ cacheDelAgency(PoolName) -> agKvsToBeam:load(?agBeamAgency, KVS), ok. +-spec getOneAgency(atom()) -> atom() | {error, term()}. getOneAgency(PoolName) -> case ?agBeamPool:get(PoolName) of undefined -> diff --git a/src/httpCli/agAgencyUtils.erl b/src/httpCli/agAgencyUtils.erl index 9d910e7..d7e74c7 100644 --- a/src/httpCli/agAgencyUtils.erl +++ b/src/httpCli/agAgencyUtils.erl @@ -14,29 +14,34 @@ , reconnectTimer/2 , agencyReply/2 , agencyReply/4 - , agencyReplyAll/1 , initReconnectState/3 , resetReconnectState/1 , updateReconnectState/1 ]). +-spec getQueue(pos_integer()) -> undefined | miRequest(). getQueue(RequestsIn) -> erlang:get(RequestsIn). +-spec addQueue(pos_integer(), miRequest()) -> undefined. addQueue(RequestsIn, MiRequest) -> erlang:put(RequestsIn, MiRequest). +-spec delQueue(pos_integer()) -> miRequest(). delQueue(RequestsIn) -> erlang:erase(RequestsIn). +-spec clearQueue() -> term(). clearQueue() -> erlang:erase(). -dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) -> - agAgencyUtils:agencyReply(CurInfo, Reply), - agAgencyUtils:agencyReplyAll(Reply), - reconnectTimer(SrvState, ClientState). +-spec dealClose(srvState(), cliState(), term()) -> {ok, srvState(), cliState()}. +dealClose(SrvState, #cliState{curInfo = CurInfo} = ClientState, Reply) -> + agencyReply(CurInfo, Reply), + agencyReplyAll(Reply), + reconnectTimer(SrvState, ClientState#cliState{requestsIn = 1, requestsOut = 0, backlogNum = 0}). +-spec reconnectTimer(srvState(), cliState()) -> {ok, srvState(), cliState()}. reconnectTimer(#srvState{reconnectState = undefined} = SrvState, CliState) -> {ok, {SrvState#srvState{socket = undefined}, CliState}}; reconnectTimer(#srvState{reconnectState = ReconnectState} = SrvState, CliState) -> diff --git a/src/httpCli/agHttpCli.erl b/src/httpCli/agHttpCli.erl index 3f2839b..e91a7a5 100644 --- a/src/httpCli/agHttpCli.erl +++ b/src/httpCli/agHttpCli.erl @@ -1,5 +1,6 @@ -module(agHttpCli). -include("agHttpCli.hrl"). +-include("erlArango.hrl"). -compile(inline). -compile({inline_size, 128}). @@ -82,8 +83,8 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout _ -> case getCurDbInfo(PoolNameOrSocket) of {DbName, UserPassWord, Host, Protocol} -> - Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), - io:format("IMY*******************************~n~p ~n",[Request]), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + io:format("IMY*******************************~n~p ~n", [Request]), case Protocol of tcp -> case gen_tcp:send(PoolNameOrSocket, Request) of @@ -95,7 +96,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>)); + receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?Head); {error, Reason} = Err -> ?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]), disConnectDb(PoolNameOrSocket), @@ -111,7 +112,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>)); + receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?Head); {error, Reason} = Err -> ?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]), disConnectDb(PoolNameOrSocket), @@ -130,16 +131,16 @@ receiveResponse(RequestId) -> Reply end. --spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}. -receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) -> +-spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> requestRet() | {error, term()}. +receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) -> receive {tcp, Socket, Data} -> - io:format("IMY******************************* ~p ~n ",[Data]), - try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + io:format("IMY******************************* ~p ~n ", [Data]), + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} -> - #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers= Headers, body = Body}; + #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}; {ok, NewRecvState} -> - receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn); + receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod); {error, Reason} -> ?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]), disConnectDb(Socket), @@ -160,15 +161,15 @@ receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn) -> {error, {tcp_error, Reason}} end. --spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp()) -> requestRet() | {error, term()}. -receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn) -> +-spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> requestRet() | {error, term()}. +receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) -> receive {ssl, Socket, Data} -> - try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}; {ok, NewRecvState} -> - receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn); + receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod); {error, Reason} -> ?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]), disConnectDb(Socket), @@ -201,7 +202,7 @@ startPool(PoolName, DbCfgs, AgencyCfgs) -> stopPool(PoolName) -> agAgencyPoolMgrIns:stopPool(PoolName). --spec connectDb(dbCfgs()) -> {ok, socket()} | error. +-spec connectDb(dbCfgs()) -> {ok, socket()} | {error, term()}. connectDb(DbCfgs) -> #dbOpts{ host = Host, @@ -240,6 +241,7 @@ connectDb(DbCfgs) -> Err end. +-spec disConnectDb(socket()) -> ok | {error, term()}. disConnectDb(Socket) -> case erlang:erase({'$agDbInfo', Socket}) of undefined -> @@ -253,12 +255,15 @@ disConnectDb(Socket) -> end end. +-spec setCurDbInfo(socket(), binary(), tuple(), host(), protocol()) -> term(). setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol) -> erlang:put({'$agDbInfo', Socket}, {DbName, UserPassword, Host, Protocol}). +-spec getCurDbInfo(socket()) -> term(). getCurDbInfo(Socket) -> erlang:get({'$agDbInfo', Socket}). +-spec setCurDbName(socket(), binary()) -> ok. setCurDbName(Socket, NewDbName) -> case erlang:get({'$agDbInfo', Socket}) of undefined -> diff --git a/src/httpCli/agHttpProtocol.erl b/src/httpCli/agHttpProtocol.erl index 951686c..5616a50 100644 --- a/src/httpCli/agHttpProtocol.erl +++ b/src/httpCli/agHttpProtocol.erl @@ -7,8 +7,8 @@ -export([ headers/1 , request/7 - , response/1 - , response/4 + , response/2 + , response/5 ]). %% <<"Content-Type: application/json; charset=utf-8">>, @@ -45,12 +45,12 @@ request(true, Body, Method, Host, _DbName, Path, Headers) -> ]. --spec response(binary()) -> {ok, recvState(), binary()} | error(). -response(Data) -> - response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data). +-spec response(binary(), boolean()) -> {ok, recvState(), binary()} | error(). +response(Data, IsHeadMethod) -> + response(undefined, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Data, IsHeadMethod). --spec response(undefined | recvState(), binary:cp(), binary:cp(), binary()) -> {ok, recvState()} | error(). -response(undefined, Rn, RnRn, Data) -> +-spec response(undefined | recvState(), binary:cp(), binary:cp(), binary(), boolean()) -> {ok, recvState()} | error(). +response(undefined, Rn, RnRn, Data, IsHeadMethod) -> case parseStatusLine(Data, Rn) of {StatusCode, Rest} -> case splitHeaders(Rest, Rn, RnRn) of @@ -59,8 +59,13 @@ response(undefined, Rn, RnRn, Data) -> {0, Headers, Rest} -> {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}}; {chunked, Headers, Body} -> - RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers}, - response(RecvState, Rn, RnRn, Body); + case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of + true -> + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}}; + _ -> + RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers}, + response(RecvState, Rn, RnRn, Body, IsHeadMethod) + end; {ContentLength, Headers, Body} -> BodySize = erlang:size(Body), if @@ -70,7 +75,12 @@ response(undefined, Rn, RnRn, Data) -> ?WARN(agTcpAgencyIns, "11 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]), {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; true -> - {ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}} + case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of + true -> + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; + _ -> + {ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}} + end end; not_enough_data -> %% headers都不足 这也可以能发生么 @@ -82,7 +92,7 @@ response(undefined, Rn, RnRn, Data) -> {error, Reason} -> {error, Reason} end; -response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = Buffer} = RecvState, Rn, _RnRn, Data) -> +response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = Buffer} = RecvState, Rn, _RnRn, Data, _IsHeadMethod) -> NewBuffer = <>, case parseChunks(NewBuffer, Rn, []) of {ok, AddBody, _Rest} -> @@ -94,7 +104,7 @@ response(#recvState{stage = body, contentLength = chunked, body = Body, buffer = {error, Reason} -> {error, Reason} end; -response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data) -> +response(#recvState{stage = body, contentLength = ContentLength, body = Body} = RecvState, _Rn, _RnRn, Data, _IsHeadMethod) -> CurData = <>, BodySize = erlang:size(CurData), if @@ -106,7 +116,7 @@ response(#recvState{stage = body, contentLength = ContentLength, body = Body} = true -> {ok, RecvState#recvState{body = CurData}} end; -response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) -> +response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data, IsHeadMethod) -> CurBody = <>, case parseStatusLine(CurBody, Rn) of {StatusCode, Rest} -> @@ -116,8 +126,13 @@ response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) -> {0, Headers, Body} -> {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Body}}; {chunked, Headers, Rest} -> - RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers}, - response(RecvState, Rn, RnRn, Rest); + case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of + true -> + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = 0, body = Rest}}; + _ -> + RecvState = #recvState{stage = body, contentLength = chunked, statusCode = StatusCode, headers = Headers}, + response(RecvState, Rn, RnRn, Rest, IsHeadMethod) + end; {ContentLength, Headers, Body} -> BodySize = erlang:size(Body), if @@ -127,7 +142,12 @@ response(#recvState{stage = header, body = OldBody}, Rn, RnRn, Data) -> ?WARN(agTcpAgencyIns, "33 contentLength get to long data why? more: ~p ~n", [BodySize - ContentLength]), {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; true -> - {ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}} + case IsHeadMethod orelse StatusCode == 204 orelse StatusCode == 304 orelse (StatusCode >= 100 andalso StatusCode < 200) of + true -> + {done, #recvState{stage = done, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}}; + _ -> + {ok, #recvState{stage = body, statusCode = StatusCode, headers = Headers, contentLength = ContentLength, body = Body}} + end end; not_enough_data -> %% headers都不足 这也可以能发生么 diff --git a/src/httpCli/agMiscUtils.erl b/src/httpCli/agMiscUtils.erl index d8c09a3..fffb3db 100644 --- a/src/httpCli/agMiscUtils.erl +++ b/src/httpCli/agMiscUtils.erl @@ -22,6 +22,7 @@ parseUrl(<<"https://", Rest/binary>>) -> parseUrl(_) -> {error, invalid_url}. +-spec parseUrl(protocol(), binary()) -> dbOpts(). parseUrl(Protocol, Rest) -> {Host, _Path} = case binary:split(Rest, <<"/">>, [trim]) of @@ -45,6 +46,7 @@ parseUrl(Protocol, Rest) -> end, #dbOpts{host = Host, port = Port, hostname = binary_to_list(Hostname), protocol = Protocol}. +-spec dbOpts(list()) -> dbOpts(). dbOpts(DbCfgs) -> BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL), DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME), @@ -52,9 +54,10 @@ dbOpts(DbCfgs) -> PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE), SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS), DbOpts = agMiscUtils:parseUrl(BaseUrl), - UserPasswordBase64 = <<"Basic ", (base64:encode(UserPassword))/binary>>, + UserPasswordBase64 = {<<"Authorization">>, <<"Basic ", (base64:encode(UserPassword))/binary>>}, DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}. +-spec agencyOpts(list()) -> agencyOpts(). agencyOpts(AgencyCfgs) -> IsReconnect = ?GET_FROM_LIST(reconnect, AgencyCfgs, ?DEFAULT_IS_RECONNECT), BacklogSize = ?GET_FROM_LIST(backlogSize, AgencyCfgs, ?DEFAULT_BACKLOG_SIZE), @@ -62,7 +65,7 @@ agencyOpts(AgencyCfgs) -> Max = ?GET_FROM_LIST(reconnectTimeMax, AgencyCfgs, ?DEFAULT_RECONNECT_MAX), #agencyOpts{reconnect = IsReconnect, backlogSize = BacklogSize, reconnectTimeMin = Min, reconnectTimeMax = Max}. - +-spec getListValue(term(), list(), term()) -> term(). getListValue(Key, List, Default) -> case lists:keyfind(Key, 1, List) of false -> @@ -82,6 +85,7 @@ randomElement([_ | _] = List) -> T = list_to_tuple(List), element(rand:uniform(tuple_size(T)), T). +-spec toBinary(term()) -> binary(). toBinary(Value) when is_integer(Value) -> integer_to_binary(Value); toBinary(Value) when is_list(Value) -> list_to_binary(Value); toBinary(Value) when is_float(Value) -> float_to_binary(Value, [{decimals, 6}, compact]); diff --git a/src/httpCli/agSslAgencyIns.erl b/src/httpCli/agSslAgencyIns.erl index ca76c26..c4cb326 100644 --- a/src/httpCli/agSslAgencyIns.erl +++ b/src/httpCli/agSslAgencyIns.erl @@ -1,5 +1,6 @@ -module(agSslAgencyIns). -include("agHttpCli.hrl"). +-include("erlArango.hrl"). -compile(inline). -compile({inline_size, 128}). @@ -34,7 +35,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod _ -> case Status of leisure -> %% 空闲模式 - Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), case ssl:send(Socket, Request) of ok -> TimerRef = @@ -44,7 +45,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), ssl:close(Socket), @@ -59,8 +60,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod end; handleMsg({ssl, Socket, Data}, #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> - try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}), case agAgencyUtils:getQueue(RequestsOut + 1) of @@ -166,7 +167,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) end; _ -> - Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), case ssl:send(Socket, Request) of ok -> TimerRef = @@ -176,7 +177,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), ssl:close(Socket), diff --git a/src/httpCli/agTcpAgencyIns.erl b/src/httpCli/agTcpAgencyIns.erl index cab5729..8992220 100644 --- a/src/httpCli/agTcpAgencyIns.erl +++ b/src/httpCli/agTcpAgencyIns.erl @@ -1,5 +1,6 @@ -module(agTcpAgencyIns). -include("agHttpCli.hrl"). +-include("erlArango.hrl"). -compile(inline). -compile({inline_size, 128}). @@ -34,8 +35,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod _ -> case Status of leisure -> %% 空闲模式 - Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), - io:format("IMY*******************************~n~p ~n",[Request]), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + io:format("IMY*******************************~n~p ~n", [Request]), case gen_tcp:send(Socket, Request) of ok -> TimerRef = @@ -45,7 +46,7 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, backlogNum = BacklogNum + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p ~p ~p ~n", [Reason, FromPid, RequestId]), gen_tcp:close(Socket), @@ -60,8 +61,8 @@ handleMsg(#miRequest{method = Method, path = Path, headers = Headers, body = Bod end; handleMsg({tcp, Socket, Data}, #srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, - #cliState{backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> - try agHttpProtocol:response(RecvState, Rn, RnRn, Data) of + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, body = Body}} -> agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, body = Body}), case agAgencyUtils:getQueue(RequestsOut + 1) of @@ -88,12 +89,12 @@ handleMsg({timeout, TimerRef, waiting_over}, agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), %% 之前的数据超时之后 要关闭tcp 然后重新建立连接 以免后面该tcp收到该次超时数据 影响后面请求的接收数据 导致数据错乱 gen_tcp:close(Socket), - self() ! ?miDoNetConnect, - {ok, SrvState, CliState#cliState{backlogNum = BacklogNum - 1}}; + handleMsg(?miDoNetConnect, SrvState#srvState{socket = undefined}, CliState#cliState{backlogNum = BacklogNum - 1}); handleMsg({tcp_closed, Socket}, #srvState{socket = Socket, serverName = ServerName} = SrvState, CliState) -> ?WARN(ServerName, "connection closed~n", []), + gen_tcp:close(Socket), agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}); handleMsg({tcp_error, Socket, Reason}, #srvState{socket = Socket, serverName = ServerName} = SrvState, @@ -129,12 +130,129 @@ handleMsg(Msg, #srvState{serverName = ServerName} = SrvState, CliState) -> -spec terminate(term(), srvState(), cliState()) -> ok. terminate(_Reason, - #srvState{timerRef = TimerRef} = SrvState, - _CliState = CliState) -> + #srvState{socket = Socket, timerRef = TimerRef} = SrvState, + #cliState{requestsOut = RequestsOut, status = Status} = CliState) -> + io:format("IMY*******************terminate ~p~n", [_Reason]), + {ok, NewSrvState, NewCliState} = overAllWork(SrvState, CliState), + gen_tcp:close(Socket), agAgencyUtils:cancelTimer(TimerRef), - agAgencyUtils:dealClose(SrvState, CliState, {error, shutdown}), + agAgencyUtils:dealClose(NewSrvState, NewCliState, {error, shutdown}), ok. +-spec overAllWork(srvState(), cliState()) -> ok. +overAllWork(#srvState{serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket, timerRef = TimerRef} = SrvState, + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState, status = Status} = CliState) -> + case Status of + leisure -> + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + ok; + MiRequest -> + overDealQueueRequest(MiRequest, SrvState, CliState) + end; + _ -> + overReceiveTcpData(SrvState, CliState) + end, + ok. + +-spec overDealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. +overDealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, + #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, + #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> + agAgencyUtils:delQueue(RequestsOut + 1), + case erlang:system_time(millisecond) > OverTime of + true -> + %% 超时了 + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, timeout}), + case agAgencyUtils:getQueue(RequestsOut + 2) of + undefined -> + ok; + MiRequest -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) + end; + _ -> + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), + case gen_tcp:send(Socket, Request) of + ok -> + TimerRef = + case OverTime of + infinity -> + undefined; + _ -> + erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) + end, + overReceiveTcpData(SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}); + {error, Reason} -> + ?WARN(ServerName, ":send error: ~p~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(FromPid, RequestId, undefined, {error, socket_send_error}), + agAgencyUtils:dealClose(SrvState, CliState, {error, socket_send_error}) + end + end. + + +-spec overReceiveTcpData(srvState(), cliState()) -> term(). +overReceiveTcpData(#srvState{poolName = PoolName, serverName = ServerName, rn = Rn, rnrn = RnRn, socket = Socket} = SrvState, + #cliState{isHeadMethod = IsHeadMethod, backlogNum = BacklogNum, curInfo = CurInfo, requestsOut = RequestsOut, recvState = RecvState} = CliState) -> + receive + {tcp, Socket, Data} -> + try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of + {done, #recvState{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}} -> + agAgencyUtils:agencyReply(CurInfo, #requestRet{statusCode = StatusCode, contentLength = ContentLength, headers = Headers, body = Body}), + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + %% todo bug fix + ok; + MiRequest -> + overDealQueueRequest(MiRequest, SrvState, CliState#cliState{backlogNum = BacklogNum - 1, status = leisure, curInfo = undefined, recvState = undefined}) + end; + {ok, NewRecvState} -> + overReceiveTcpData(SrvState, CliState#cliState{recvState = NewRecvState}); + {error, Reason} -> + ?WARN(overReceiveTcpData, "handle tcp data error: ~p ~n", [Reason]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_data_error, Reason}}) + catch + E:R:S -> + ?WARN(overReceiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]), + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, handledata_error}}), + ok + end; + {timeout, _TimerRef, waiting_over} -> + gen_tcp:close(Socket), + agAgencyUtils:agencyReply(CurInfo, {error, timeout}), + case agAgencyUtils:getQueue(RequestsOut + 1) of + undefined -> + ok; + MiRequest -> + case ?agBeamPool:get(PoolName) of + #dbOpts{port = Port, hostname = HostName, socketOpts = SocketOpts} -> + case dealConnect(ServerName, HostName, Port, SocketOpts) of + {ok, NewSocket} -> + %% 新建连接之后 需要重置之前的buff之类状态数据 + NewCliState = CliState#cliState{status = leisure, recvState = undefined, curInfo = undefined}, + overDealQueueRequest(MiRequest, SrvState#srvState{socket = NewSocket}, NewCliState); + {error, _Reason} -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {new_tcp_connect_error_over, _Reason}}), + ok + end; + _Ret -> + agAgencyUtils:dealClose(SrvState, CliState, {error, {not_found_poolName, PoolName}}), + ok + end + end; + {tcp_closed, Socket} -> + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, tcp_closed}), + ok; + {tcp_error, Socket, Reason} -> + gen_tcp:close(Socket), + agAgencyUtils:dealClose(SrvState, CliState, {error, {tcp_error, Reason}}), + ok + end. + +-spec dealConnect(atom(), hostName(), port(), socketOpts()) -> {ok, socket()} | {error, term()}. dealConnect(ServerName, HostName, Port, SocketOptions) -> case inet:getaddrs(HostName, inet) of {ok, IPList} -> @@ -151,9 +269,10 @@ dealConnect(ServerName, HostName, Port, SocketOptions) -> {error, Reason} end. +-spec dealQueueRequest(miRequest(), srvState(), cliState()) -> {ok, srvState(), cliState()}. dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = FromPid, overTime = OverTime, isSystem = IsSystem}, #srvState{serverName = ServerName, host = Host, userPassWord = UserPassWord, dbName = DbName, socket = Socket} = SrvState, - #cliState{requestsOut = RequestsOut} = CliState) -> + #cliState{requestsOut = RequestsOut, backlogNum = BacklogNum} = CliState) -> agAgencyUtils:delQueue(RequestsOut + 1), case erlang:system_time(millisecond) > OverTime of true -> @@ -163,10 +282,10 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod undefined -> {ok, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}}; MiRequest -> - dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1}) + dealQueueRequest(MiRequest, SrvState, CliState#cliState{requestsOut = RequestsOut + 1, backlogNum = BacklogNum - 1}) end; _ -> - Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [{<<"Authorization">>, UserPassWord} | Headers]), + Request = agHttpProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]), case gen_tcp:send(Socket, Request) of ok -> TimerRef = @@ -176,7 +295,7 @@ dealQueueRequest(#miRequest{method = Method, path = Path, headers = Headers, bod _ -> erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}]) end, - {ok, SrvState, CliState#cliState{status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; + {ok, SrvState, CliState#cliState{isHeadMethod = Method == ?Head, status = waiting, requestsOut = RequestsOut + 1, curInfo = {FromPid, RequestId, TimerRef}}}; {error, Reason} -> ?WARN(ServerName, ":send error: ~p~n", [Reason]), gen_tcp:close(Socket), diff --git a/src/httpCli/test.erl b/src/httpCli/test.erl index 24fc92f..5e933df 100644 --- a/src/httpCli/test.erl +++ b/src/httpCli/test.erl @@ -5,7 +5,7 @@ start() -> application:ensure_all_started(erlArango), - agHttpCli:startPool(tt, [{poolSize, 100}], []). + agHttpCli:startPool(tt, [{poolSize, 10}], []). tt(C, N) -> application:start(erlArango),