arangodb erlang数据库驱动
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
11 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. -module(agVstCli).
  2. -include("agVstCli.hrl").
  3. -include("eArango.hrl").
  4. -compile(inline).
  5. -compile({inline_size, 128}).
  6. -export([
  7. %% Common Request API
  8. callAgency/5
  9. , callAgency/6
  10. , callAgency/7
  11. , castAgency/5
  12. , castAgency/6
  13. , castAgency/7
  14. , castAgency/8
  15. , receiveRequestRet/2
  16. %% Pools API
  17. , startPool/2
  18. , startPool/3
  19. , stopPool/1
  20. %% Single Process DbAPI
  21. , connectDb/1
  22. , disConnectDb/1
  23. , getCurDbInfo/1
  24. , useDatabase/2
  25. ]).
  26. -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> term() | {error, term()}.
  27. callAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
  28. callAgency(PoolNameOrSocket, Method, Path, Headers, Body, false, ?DEFAULT_TIMEOUT).
  29. -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> term() | {error, atom()}.
  30. callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
  31. callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, ?DEFAULT_TIMEOUT).
  32. -spec callAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> term() | {error, atom()}.
  33. callAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
  34. case castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout) of
  35. {waitRRT, RequestId, MonitorRef} ->
  36. receiveRequestRet(RequestId, MonitorRef);
  37. {error, _Reason} = Err ->
  38. Err;
  39. Ret ->
  40. Ret
  41. end.
  42. -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body()) -> {ok, messageId()} | {error, atom()}.
  43. castAgency(PoolNameOrSocket, Method, Path, Headers, Body) ->
  44. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), false, ?DEFAULT_TIMEOUT).
  45. -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean()) -> {ok, messageId()} | {error, atom()}.
  46. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem) ->
  47. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, ?DEFAULT_TIMEOUT).
  48. -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
  49. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, IsSystem, Timeout) ->
  50. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, self(), IsSystem, Timeout).
  51. -spec castAgency(poolNameOrSocket(), method(), path(), headers(), body(), pid(), boolean(), timeout()) -> {ok, messageId()} | {error, atom()}.
  52. castAgency(PoolNameOrSocket, Method, Path, Headers, Body, Pid, IsSystem, Timeout) ->
  53. OverTime =
  54. case Timeout of
  55. infinity -> infinity;
  56. _ ->
  57. erlang:monotonic_time(millisecond) + Timeout
  58. end,
  59. case erlang:is_atom(PoolNameOrSocket) of
  60. true ->
  61. case agAgencyPoolMgrIns:getOneAgency(PoolNameOrSocket) of
  62. {error, pool_not_found} = Err ->
  63. Err;
  64. undefined ->
  65. {error, undefined_server};
  66. AgencyName ->
  67. MonitorRef = erlang:monitor(process, AgencyName),
  68. RequestId = {AgencyName, MonitorRef},
  69. catch AgencyName ! #agReq{method = Method, path = Path, headers = Headers, body = Body, messageId = RequestId, fromPid = Pid, overTime = OverTime, isSystem = IsSystem},
  70. {waitRRT, RequestId, MonitorRef}
  71. end;
  72. _ ->
  73. case getCurDbInfo(PoolNameOrSocket) of
  74. {DbName, UserPassWord, Host, Protocol} ->
  75. Request = agVstProtocol:request(IsSystem, Body, Method, Host, DbName, Path, [UserPassWord | Headers]),
  76. case Protocol of
  77. tcp ->
  78. case gen_tcp:send(PoolNameOrSocket, Request) of
  79. ok ->
  80. receiveTcpData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
  81. {error, Reason} = Err ->
  82. ?AgWarn(castAgency, ":gen_tcp send error: ~p ~n", [Reason]),
  83. disConnectDb(PoolNameOrSocket),
  84. Err
  85. end;
  86. ssl ->
  87. case ssl:send(PoolNameOrSocket, Request) of
  88. ok ->
  89. receiveSslData(undefined, PoolNameOrSocket, binary:compile_pattern(<<"\r\n">>), binary:compile_pattern(<<"\r\n\r\n">>), Method == ?AgHead);
  90. {error, Reason} = Err ->
  91. ?AgWarn(castAgency, ":ssl send error: ~p ~n", [Reason]),
  92. disConnectDb(PoolNameOrSocket),
  93. Err
  94. end
  95. end;
  96. _ ->
  97. {error, dbinfoNotFound}
  98. end
  99. end.
  100. -spec receiveRequestRet(messageId(), reference()) -> {StatusCode :: non_neg_integer(), Body :: binary(), Headers :: binary()} | {error, term()}.
  101. receiveRequestRet(RequestId, MonitorRef) ->
  102. receive
  103. #agReqRet{messageId = RequestId, reply = Reply} ->
  104. erlang:demonitor(MonitorRef),
  105. case Reply of
  106. {_StatusCode, Body, _Headers} ->
  107. case Body of
  108. <<>> ->
  109. erlang:setelement(2, Reply, #{});
  110. _ ->
  111. erlang:setelement(2, Reply, jiffy:decode(Body, [return_maps, copy_strings]))
  112. end;
  113. _ ->
  114. Reply
  115. end;
  116. {'DOWN', MonitorRef, process, _Pid, Reason} ->
  117. {error, {agencyDown, Reason}}
  118. end.
  119. -spec receiveTcpData(recvState() | undefined, socket(), binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
  120. receiveTcpData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
  121. receive
  122. {tcp, Socket, Data} ->
  123. try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
  124. {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
  125. case Body of
  126. <<>> ->
  127. {ok, #{}, StatusCode, Headers};
  128. _ ->
  129. {ok, jiffy:decode(Body, [return_maps, copy_strings]), StatusCode, Headers}
  130. end;
  131. {ok, NewRecvState} ->
  132. receiveTcpData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
  133. {error, Reason} ->
  134. ?AgWarn(receiveTcpData, "handle tcp data error: ~p ~n", [Reason]),
  135. disConnectDb(Socket),
  136. {error, {tcpDataError, Reason}}
  137. catch
  138. E:R:S ->
  139. ?AgWarn(receiveTcpData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
  140. disConnectDb(Socket),
  141. {error, handledataError}
  142. end;
  143. {tcp_closed, Socket} ->
  144. disConnectDb(Socket),
  145. {error, tcp_closed};
  146. {tcp_error, Socket, Reason} ->
  147. disConnectDb(Socket),
  148. {error, {tcp_error, Reason}}
  149. end.
  150. -spec receiveSslData(recvState() | undefined, socket(), binary:cp(), binary:cp(), boolean()) -> {ok, term(), term()} | {error, term()}.
  151. receiveSslData(RecvState, Socket, Rn, RnRn, IsHeadMethod) ->
  152. receive
  153. {ssl, Socket, Data} ->
  154. try agVstProtocol:response(RecvState, Rn, RnRn, Data, IsHeadMethod) of
  155. {done, #recvState{statusCode = StatusCode, headers = Headers, body = Body}} ->
  156. case Body of
  157. <<>> ->
  158. {ok, #{}, StatusCode, Headers};
  159. _ ->
  160. {ok, jiffy:decode(Body, [return_maps, copy_strings]), StatusCode, Headers}
  161. end;
  162. {ok, NewRecvState} ->
  163. receiveSslData(NewRecvState, Socket, Rn, RnRn, IsHeadMethod);
  164. {error, Reason} ->
  165. ?AgWarn(receiveSslData, "handle tcp data error: ~p ~n", [Reason]),
  166. disConnectDb(Socket),
  167. {error, {sslDataError, Reason}}
  168. catch
  169. E:R:S ->
  170. ?AgWarn(receiveSslData, "handle tcp data crash: ~p:~p~n~p ~n ", [E, R, S]),
  171. disConnectDb(Socket),
  172. {error, handledataError}
  173. end;
  174. {ssl_closed, Socket} ->
  175. disConnectDb(Socket),
  176. {error, ssl_closed};
  177. {ssl_error, Socket, Reason} ->
  178. disConnectDb(Socket),
  179. {error, {ssl_error, Reason}}
  180. end.
  181. -spec startPool(poolName(), dbCfgs()) -> ok | {error, poolNameUsed}.
  182. startPool(PoolName, DbCfgs) ->
  183. agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, []).
  184. -spec startPool(poolName(), dbCfgs(), agencyCfgs()) -> ok | {error, poolNameUsed}.
  185. startPool(PoolName, DbCfgs, AgencyCfgs) ->
  186. agAgencyPoolMgrIns:startPool(PoolName, DbCfgs, AgencyCfgs).
  187. -spec stopPool(poolName()) -> ok | {error, poolNotStarted}.
  188. stopPool(PoolName) ->
  189. agAgencyPoolMgrIns:stopPool(PoolName).
  190. -spec connectDb(dbCfgs()) -> {ok, socket()} | {error, term()}.
  191. connectDb(DbCfgs) ->
  192. #dbOpts{
  193. host = Host,
  194. port = Port,
  195. hostname = HostName,
  196. dbName = DbName,
  197. protocol = Protocol,
  198. userPassword = UserPassword,
  199. socketOpts = SocketOpts
  200. } = agMiscUtils:dbOpts(DbCfgs),
  201. case inet:getaddrs(HostName, inet) of
  202. {ok, IPList} ->
  203. Ip = agMiscUtils:randomElement(IPList),
  204. case Protocol of
  205. tcp ->
  206. case gen_tcp:connect(Ip, Port, SocketOpts, ?AgDefConnTimeout) of
  207. {ok, Socket} ->
  208. setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
  209. {ok, Socket};
  210. {error, Reason} = Err ->
  211. ?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
  212. Err
  213. end;
  214. ssl ->
  215. case ssl:connect(Ip, Port, SocketOpts, ?AgDefConnTimeout) of
  216. {ok, Socket} ->
  217. setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol),
  218. {ok, Socket};
  219. {error, Reason} = Err ->
  220. ?AgWarn(connectDb, "connect error: ~p~n", [Reason]),
  221. Err
  222. end
  223. end;
  224. {error, Reason} = Err ->
  225. ?AgWarn(connectDb, "getaddrs error: ~p~n", [Reason]),
  226. Err
  227. end.
  228. -spec disConnectDb(socket()) -> ok | {error, term()}.
  229. disConnectDb(Socket) ->
  230. case erlang:erase({'$agDbInfo', Socket}) of
  231. undefined ->
  232. ignore;
  233. {_DbName, _UserPassword, _Host, Protocol} ->
  234. case Protocol of
  235. tcp ->
  236. gen_tcp:close(Socket);
  237. ssl ->
  238. ssl:close(Socket)
  239. end
  240. end.
  241. -spec setCurDbInfo(socket(), binary(), tuple(), host(), protocol()) -> term().
  242. setCurDbInfo(Socket, DbName, UserPassword, Host, Protocol) ->
  243. erlang:put({'$agDbInfo', Socket}, {DbName, UserPassword, Host, Protocol}).
  244. -spec getCurDbInfo(socket()) -> term().
  245. getCurDbInfo(Socket) ->
  246. erlang:get({'$agDbInfo', Socket}).
  247. -spec useDatabase(socket(), binary()) -> ok.
  248. useDatabase(Socket, NewDbName) ->
  249. case erlang:get({'$agDbInfo', Socket}) of
  250. undefined ->
  251. ignore;
  252. {_DbName, UserPassword, Host, Protocol} ->
  253. erlang:put({'$agDbInfo', Socket}, {<<"/_db/", NewDbName/binary>>, UserPassword, Host, Protocol})
  254. end,
  255. ok.