Browse Source

简单修改

erlArango_v1
SisMaker 5 years ago
parent
commit
1e551b109d
5 changed files with 84 additions and 65 deletions
  1. +6
    -4
      include/agHttpCli.hrl
  2. +62
    -28
      src/arangoApi/agDbMgr.erl
  3. +1
    -1
      src/httpCli/agAgencyPoolMgrIns.erl
  4. +11
    -29
      src/httpCli/agHttpCli.erl
  5. +4
    -3
      src/httpCli/agMiscUtils.erl

+ 6
- 4
include/agHttpCli.hrl View File

@ -7,8 +7,9 @@
%% %%
-define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>). -define(DEFAULT_BASE_URL, <<"http://120.77.213.39:8529">>).
-define(DEFAULT_DBNAME, <<"_db/_system">>).
-define(USER_PASSWORD, <<"root:156736">>).
-define(DEFAULT_DBNAME, <<"_system">>).
-define(DEFAULT_USER, <<"root">>).
-define(DEFAULT_PASSWORD, <<"156736">>).
-define(DEFAULT_BACKLOG_SIZE, 1024). -define(DEFAULT_BACKLOG_SIZE, 1024).
-define(DEFAULT_CONNECT_TIMEOUT, 5000). -define(DEFAULT_CONNECT_TIMEOUT, 5000).
-define(DEFAULT_POOL_SIZE, 16). -define(DEFAULT_POOL_SIZE, 16).
@ -17,7 +18,7 @@
-define(DEFAULT_RECONNECT_MAX, 120000). -define(DEFAULT_RECONNECT_MAX, 120000).
-define(DEFAULT_TIMEOUT, infinity). -define(DEFAULT_TIMEOUT, infinity).
-define(DEFAULT_PID, self()). -define(DEFAULT_PID, self()).
-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf, 2097152}, {send_timeout, 5000}, {send_timeout_close, true}]).
-define(DEFAULT_SOCKET_OPTS, [binary, {active, true}, {nodelay, true}, {delay_send, true}, {keepalive, true}, {recbuf,1048576}, {send_timeout, 5000}, {send_timeout_close, true}]).
-define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)). -define(GET_FROM_LIST(Key, List), agMiscUtils:getListValue(Key, List, undefined)).
-define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)). -define(GET_FROM_LIST(Key, List, Default), agMiscUtils:getListValue(Key, List, Default)).
@ -126,7 +127,8 @@
-type dbCfg() :: -type dbCfg() ::
{baseUrl, binary()} | {baseUrl, binary()} |
{dbName, binary()} | {dbName, binary()} |
{userPassword, binary()} |
{user, binary()} |
{password, binary()} |
{poolSize, poolSize()} | {poolSize, poolSize()} |
{socketOpts, [gen_tcp:connect_option(), ...]}. {socketOpts, [gen_tcp:connect_option(), ...]}.

+ 62
- 28
src/arangoApi/agDbMgr.erl View File

@ -7,48 +7,82 @@
%% doc_address:https://www.arangodb.com/docs/stable/http/database-database-management.html %% doc_address:https://www.arangodb.com/docs/stable/http/database-database-management.html
%% /_api/database/properties
%% GET /_api/database/current
%
% /_ap/database/properties
% GET /_api/database/current
%
% JSON对象
% name
% idID
% path
% isSystem_system数据库
% sharding
% ReplicationFactor
% writeConcern
%
% 200
% 400
% 404
curDbInfo(PoolNameOrSocket) -> curDbInfo(PoolNameOrSocket) ->
agHttpCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_api/database/current">>, [], undefined). agHttpCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_api/database/current">>, [], undefined).
%% 访
%% GET /_api/database/user
curVisitDbs(PoolNameOrSocket) ->
% 访
% GET /_api/database/user
% 访
%
% 200
% 400
visitDbs(PoolNameOrSocket) ->
agHttpCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_api/database/user">>, [], undefined). agHttpCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_api/database/user">>, [], undefined).
%%
%% GET /_api/database
curDbList(PoolNameOrSocket) ->
%
% GET /_api/database
%
% _system数据库中检索数据库列表
% 使GET用户API来获取可用数据库的列表
%
% 200
% 400
% 403_system数据库中执行
allDbs(PoolNameOrSocket) ->
agHttpCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_api/database">>, [], undefined). agHttpCli:callAgency(PoolNameOrSocket, ?AgGet, <<"/_api/database">>, [], undefined).
%%
%% POST /_api/database
%
% POST /_api/database
% JSON对象是必需的 % JSON对象是必需的
% name % name
% options % options
% sharding flexible single
% ReplicationFactor
% satellite satellite 1
% writeConcern
% DBServer上同步每个分片需要多少个副本
% writeConcern的值 ReplicationFactor
% users
% users或不包含任何用户使 root访
%
% username
% passwd
% activetruefalse
% extraJSON对象Extra中包含的数据 ArangoDB不会进一步解释
% sharding flexible single
% ReplicationFactor satellite satellite 1
% writeConcernDBServer上同步每个分片需要多少个副本writeConcern的值 ReplicationFactor
% usersusers或不包含任何用户使 root访
% username
% passwd
% activetruefalse
% extraJSON对象Extra中包含的数据 ArangoDB不会进一步解释
%
% JSON对象true
% _system数据库中创建新数据库
%
% 201
% 400
% 403_system数据库中执行
% 409
newDb(PoolNameOrSocket, MapData) -> newDb(PoolNameOrSocket, MapData) ->
BodyStr = jiffy:encode(MapData), BodyStr = jiffy:encode(MapData),
agHttpCli:callAgency(PoolNameOrSocket, ?AgPost, <<"/_api/database">>, [], BodyStr, true). agHttpCli:callAgency(PoolNameOrSocket, ?AgPost, <<"/_api/database">>, [], BodyStr, true).
%%
%% DELETE /_api/database/{database-name}
%
% DELETE /_api/database/{database-name}
%
% database-name
%
% _system数据库中删除数据库_SYSTEM数据库本身不能被丢弃
%
% 200
% 400
% 403_system数据库中执行
% 404
delDb(PoolNameOrSocket, Name) -> delDb(PoolNameOrSocket, Name) ->
Path = <<"/_api/database/", Name/binary>>, Path = <<"/_api/database/", Name/binary>>,
agHttpCli:callAgency(PoolNameOrSocket, ?AgDelete, Path, [], undefined, true). agHttpCli:callAgency(PoolNameOrSocket, ?AgDelete, Path, [], undefined, true).

+ 1
- 1
src/httpCli/agAgencyPoolMgrIns.erl View File

@ -112,7 +112,7 @@ agencyMod(_) ->
agencySpec(ServerMod, ServerName, Args) -> agencySpec(ServerMod, ServerName, Args) ->
%% TODO spawn_opt %% TODO spawn_opt
StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 10240}, {min_bin_vheap_size, 524288}, {fullsweep_after, 512}]]},
StartFunc = {ServerMod, start_link, [ServerName, Args, [{min_heap_size, 10240}, {min_bin_vheap_size, 524288}, {fullsweep_after, 1024}]]},
{ServerName, StartFunc, transient, infinity, worker, [ServerMod]}. {ServerName, StartFunc, transient, infinity, worker, [ServerMod]}.
-spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok. -spec startChildren(atom(), protocol(), poolSize(), agencyOpts()) -> ok.

+ 11
- 29
src/httpCli/agHttpCli.erl View File

@ -40,7 +40,7 @@ callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
-spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}. -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}.
callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) -> callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
case castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout) of case castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout) of
{ok, RequestId, MonitorRef} ->
{waitRRT, RequestId, MonitorRef} ->
receiveRequestRet(RequestId, MonitorRef); receiveRequestRet(RequestId, MonitorRef);
{error, _Reason} = Err -> {error, _Reason} = Err ->
Err; Err;
@ -79,7 +79,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
MonitorRef = erlang:monitor(process, AgencyName), MonitorRef = erlang:monitor(process, AgencyName),
RequestId = {AgencyName, MonitorRef}, RequestId = {AgencyName, MonitorRef},
catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem}, catch AgencyName ! #miRequest{method = Method, path = Path, headers = Headers, body = Body, requestId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
{ok, RequestId, MonitorRef}
{waitRRT, RequestId, MonitorRef}
end; end;
_ -> _ ->
case getCurDbInfo(PoolNameOrSocket) of case getCurDbInfo(PoolNameOrSocket) of
@ -89,14 +89,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
tcp -> tcp ->
case gen_tcp:send(PoolNameOrSocket, Request) of case gen_tcp:send(PoolNameOrSocket, Request) of
ok -> ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
receiveTcpData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
receiveTcpData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
{error, Reason} = Err -> {error, Reason} = Err ->
?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]), ?WARN(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket), disConnectDb(PoolNameOrSocket),
@ -105,14 +98,7 @@ castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout
ssl -> ssl ->
case ssl:send(PoolNameOrSocket, Request) of case ssl:send(PoolNameOrSocket, Request) of
ok -> ok ->
TimerRef =
case OverTime of
infinity ->
undefined;
_ ->
erlang:start_timer(OverTime, self(), waiting_over, [{abs, true}])
end,
receiveSslData(undefined, PoolNameOrSocket, TimerRef, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
receiveSslData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
{error, Reason} = Err -> {error, Reason} = Err ->
?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]), ?WARN(castAgency, ":ssl send error: ~p ~n", [Reason]),
disConnectDb(PoolNameOrSocket), disConnectDb(PoolNameOrSocket),
@ -139,15 +125,15 @@ receiveRequestRet(RequestId, MonitorRef) ->
{error, {agencyDown, Reason}} {error, {agencyDown, Reason}}
end. end.
-spec receiveTcpData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
-spec receiveTcpData(recvState() | undefined, socket(), binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receive receive
{tcp, Socket, Data} -> {tcp, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{headers = Headers, body = Body}} -> {done, #recvState{headers = Headers, body = Body}} ->
{ok, Headers, jiffy:decode(Body, [return_maps, copy_strings])}; {ok, Headers, jiffy:decode(Body, [return_maps, copy_strings])};
{ok, NewRecvState} -> {ok, NewRecvState} ->
receiveTcpData(NewRecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod);
receiveTcpData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
{error, Reason} -> {error, Reason} ->
?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]), ?WARN(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket), disConnectDb(Socket),
@ -158,8 +144,6 @@ receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
disConnectDb(Socket), disConnectDb(Socket),
{error, handledata_error} {error, handledata_error}
end; end;
{timeout, TimerRef, waiting_over} ->
{error, timeout};
{tcp_closed, Socket} -> {tcp_closed, Socket} ->
disConnectDb(Socket), disConnectDb(Socket),
{error, tcp_closed}; {error, tcp_closed};
@ -168,15 +152,15 @@ receiveTcpData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
{error, {tcp_error, Reason}} {error, {tcp_error, Reason}}
end. end.
-spec receiveSslData(recvState() | undefined, socket(), reference() | undefined, binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
-spec receiveSslData(recvState() | undefined, socket(), binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
receive receive
{ssl, Socket, Data} -> {ssl, Socket, Data} ->
try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of try agHttpProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
{done, #recvState{headers = Headers, body = Body}} -> {done, #recvState{headers = Headers, body = Body}} ->
{ok, Headers, jiffy:decode(Body, [return_maps, copy_strings])}; {ok, Headers, jiffy:decode(Body, [return_maps, copy_strings])};
{ok, NewRecvState} -> {ok, NewRecvState} ->
receiveSslData(NewRecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod);
receiveSslData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
{error, Reason} -> {error, Reason} ->
?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]), ?WARN(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
disConnectDb(Socket), disConnectDb(Socket),
@ -187,8 +171,6 @@ receiveSslData(RecvState, Socket, TimerRef, Rn, RnRn, IsHeadMethod) ->
disConnectDb(Socket), disConnectDb(Socket),
{error, handledata_error} {error, handledata_error}
end; end;
{timeout, TimerRef, waiting_over} ->
{error, timeout};
{ssl_closed, Socket} -> {ssl_closed, Socket} ->
disConnectDb(Socket), disConnectDb(Socket),
{error, ssl_closed}; {error, ssl_closed};
@ -276,6 +258,6 @@ setCurDbName(Socket, NewDbName) ->
undefined -> undefined ->
ignore; ignore;
{_DbName, UserPassword, Host, Protocol} -> {_DbName, UserPassword, Host, Protocol} ->
erlang:put({'$agDbInfo', Socket}, {NewDbName, UserPassword, Host, Protocol})
erlang:put({'$agDbInfo', Socket}, {<<"_db/", NewDbName/binary>>, UserPassword, Host, Protocol})
end, end,
ok. ok.

+ 4
- 3
src/httpCli/agMiscUtils.erl View File

@ -53,12 +53,13 @@ parseUrl(Protocol, Rest) ->
dbOpts(DbCfgs) -> dbOpts(DbCfgs) ->
BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL), BaseUrl = ?GET_FROM_LIST(baseUrl, DbCfgs, ?DEFAULT_BASE_URL),
DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME), DbName = ?GET_FROM_LIST(dbName, DbCfgs, ?DEFAULT_DBNAME),
UserPassword = ?GET_FROM_LIST(userPassword, DbCfgs, ?USER_PASSWORD),
User = ?GET_FROM_LIST(user, DbCfgs, ?DEFAULT_USER),
Password = ?GET_FROM_LIST(password, DbCfgs, ?DEFAULT_PASSWORD),
PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE), PoolSize = ?GET_FROM_LIST(poolSize, DbCfgs, ?DEFAULT_POOL_SIZE),
SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS), SocketOpts = ?GET_FROM_LIST(socketOpts, DbCfgs, ?DEFAULT_SOCKET_OPTS),
DbOpts = agMiscUtils:parseUrl(BaseUrl), DbOpts = agMiscUtils:parseUrl(BaseUrl),
UserPasswordBase64 = {<<"Authorization">>, <<"Basic ", (base64:encode(UserPassword))/binary>>},
DbOpts#dbOpts{dbName = DbName, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
UserPasswordBase64 = {<<"Authorization">>, <<"Basic ", (base64:encode(<<User/binary, ":", Password/binary>>))/binary>>},
DbOpts#dbOpts{dbName = <<"_db/", DbName/binary>>, userPassword = UserPasswordBase64, poolSize = PoolSize, socketOpts = SocketOpts}.
-spec agencyOpts(list()) -> agencyOpts(). -spec agencyOpts(list()) -> agencyOpts().
agencyOpts(AgencyCfgs) -> agencyOpts(AgencyCfgs) ->

Loading…
Cancel
Save