You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
8.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. #include <string.h>
  2. #include "erl_nif.h"
  3. #include "concurrentqueue.h"
  4. #include <vector>
  5. struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits {
  6. static const size_t BLOCK_SIZE = 16;
  7. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 16;
  8. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 8;
  9. static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 8;
  10. static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 16;
  11. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
  12. static inline void *malloc(std::size_t size) { return enif_alloc(size); }
  13. static inline void free(void *ptr) { enif_free(ptr); }
  14. };
  15. using lfqIns = moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits> *;
  16. const size_t BulkDelCnt = 200;
  17. ERL_NIF_TERM atomOk;
  18. ERL_NIF_TERM atomError;
  19. ERL_NIF_TERM atomNewErr;
  20. ERL_NIF_TERM atomTrue;
  21. ERL_NIF_TERM atomFalse;
  22. ERL_NIF_TERM atomEmpty;
  23. void eLfqFree(ErlNifEnv *, void *obj) {
  24. lfqIns *ObjIns = static_cast<lfqIns *>(obj);
  25. if (NULL != ObjIns && NULL != *ObjIns) {
  26. std::vector <ErlNifBinary> TermBinList(BulkDelCnt);
  27. size_t OutSize;
  28. do{
  29. OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size());
  30. for (int i = OutSize - 1; i >= 0; i--) {
  31. enif_release_binary(&TermBinList[i]);
  32. }
  33. }while(OutSize >= BulkDelCnt);
  34. delete (*ObjIns);
  35. *ObjIns = NULL;
  36. }
  37. }
  38. int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) {
  39. ErlNifResourceFlags flags = static_cast<ErlNifResourceFlags>(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER);
  40. ErlNifResourceType *ResIns = NULL;
  41. ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL);
  42. if (NULL == ResIns)
  43. return -1;
  44. *priv_data = ResIns;
  45. atomOk = enif_make_atom(env, "ok");
  46. atomTrue = enif_make_atom(env, "true");
  47. atomFalse = enif_make_atom(env, "false");
  48. atomError = enif_make_atom(env, "lfq_error");
  49. atomEmpty = enif_make_atom(env, "lfq_empty");
  50. atomNewErr = enif_make_atom(env, "error");
  51. return 0;
  52. }
  53. int nifUpgrade(ErlNifEnv *env, void **priv_data, void **, ERL_NIF_TERM) {
  54. ErlNifResourceFlags flags = static_cast<ErlNifResourceFlags>(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER);
  55. ErlNifResourceType *ResIns = NULL;
  56. ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL);
  57. if (NULL == ResIns)
  58. return -1;
  59. *priv_data = ResIns;
  60. return 0;
  61. }
  62. void nifUnload(ErlNifEnv *, void *priv_data) {
  63. }
  64. ERL_NIF_TERM nifNew(ErlNifEnv *env, int, const ERL_NIF_TERM *) {
  65. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  66. lfqIns *ObjIns = static_cast<lfqIns *>(enif_alloc_resource(ResIns, sizeof(lfqIns)));
  67. *ObjIns = new moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits>;
  68. if (ObjIns == NULL)
  69. return atomNewErr;
  70. if (*ObjIns == NULL)
  71. return atomNewErr;
  72. ERL_NIF_TERM RefTerm = enif_make_resource(env, ObjIns);
  73. enif_release_resource(ObjIns);
  74. return enif_make_tuple2(env, atomOk, RefTerm);
  75. }
  76. ERL_NIF_TERM nifDel1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
  77. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  78. lfqIns *ObjIns = NULL;
  79. if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) {
  80. return enif_make_badarg(env);
  81. }
  82. if (NULL != ObjIns && NULL != *ObjIns) {
  83. std::vector <ErlNifBinary> TermBinList(BulkDelCnt);
  84. size_t OutSize;
  85. do{
  86. OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size());
  87. for (int i = OutSize - 1; i >= 0; i--) {
  88. enif_release_binary(&TermBinList[i]);
  89. }
  90. }while(OutSize >= BulkDelCnt);
  91. delete (*ObjIns);
  92. *ObjIns = NULL;
  93. }
  94. return atomOk;
  95. }
  96. ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
  97. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  98. lfqIns *ObjIns = NULL;
  99. if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) {
  100. return enif_make_badarg(env);
  101. }
  102. ErlNifBinary TermBin;
  103. if (!enif_term_to_binary(env, argv[1], &TermBin))
  104. return enif_make_badarg(env);
  105. if ((*ObjIns)->enqueue(TermBin)) {
  106. return atomTrue;
  107. } else {
  108. enif_release_binary(&TermBin);
  109. return atomFalse;
  110. }
  111. }
  112. ERL_NIF_TERM nifIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
  113. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  114. lfqIns *ObjIns = NULL;
  115. if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) {
  116. return enif_make_badarg(env);
  117. }
  118. ERL_NIF_TERM List;
  119. ERL_NIF_TERM Head;
  120. List = argv[1];
  121. if (!enif_is_list(env, List)) {
  122. return enif_make_badarg(env);
  123. }
  124. unsigned ListLen;
  125. enif_get_list_length(env, List, &ListLen);
  126. std::vector <ErlNifBinary> TermBinList;
  127. ErlNifBinary TermBin;
  128. while (enif_get_list_cell(env, List, &Head, &List)) {
  129. if (!enif_term_to_binary(env, Head, &TermBin)) {
  130. for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) {
  131. enif_release_binary(&(*it));
  132. }
  133. return enif_make_badarg(env);
  134. }
  135. TermBinList.push_back(TermBin);
  136. }
  137. if ((*ObjIns)->enqueue_bulk(TermBinList.cbegin(), TermBinList.size())) {
  138. return atomTrue;
  139. } else {
  140. for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) {
  141. enif_release_binary(&(*it));
  142. }
  143. return atomFalse;
  144. }
  145. }
  146. ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
  147. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  148. lfqIns *ObjIns = NULL;
  149. if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) {
  150. return enif_make_badarg(env);
  151. }
  152. ErlNifBinary TermBin;
  153. if ((*ObjIns)->try_dequeue(TermBin)) {
  154. ERL_NIF_TERM OutTerm;
  155. if (enif_binary_to_term(env, TermBin.data, TermBin.size, &OutTerm, 0) == 0) {
  156. enif_release_binary(&TermBin);
  157. return atomError;
  158. } else {
  159. enif_release_binary(&TermBin);
  160. return OutTerm;
  161. }
  162. } else {
  163. return atomEmpty;
  164. }
  165. }
  166. ERL_NIF_TERM nifTryOuts2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
  167. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  168. lfqIns *ObjIns = NULL;
  169. if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) {
  170. return enif_make_badarg(env);
  171. }
  172. unsigned int OutLen;
  173. if (!enif_get_uint(env, argv[1], &OutLen)) {
  174. return enif_make_badarg(env);
  175. }
  176. std::vector <ErlNifBinary> TermBinList(OutLen);
  177. size_t OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size());
  178. ERL_NIF_TERM RetList = enif_make_list(env, 0);
  179. ERL_NIF_TERM OutTerm;
  180. for (int i = OutSize - 1; i >= 0; i--) {
  181. if (enif_binary_to_term(env, TermBinList[i].data, TermBinList[i].size, &OutTerm, 0) == 0) {
  182. enif_release_binary(&TermBinList[i]);
  183. } else {
  184. enif_release_binary(&TermBinList[i]);
  185. RetList = enif_make_list_cell(env, OutTerm, RetList);
  186. }
  187. }
  188. return RetList;
  189. }
  190. ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
  191. ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
  192. lfqIns *ObjIns = NULL;
  193. if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) {
  194. return enif_make_badarg(env);
  195. }
  196. size_t LfqSize = (*ObjIns)->size_approx();
  197. return enif_make_long(env, (long int) LfqSize);
  198. }
  199. static ErlNifFunc nifFuncs[] =
  200. {
  201. {"new", 0, nifNew},
  202. {"del", 1, nifDel1},
  203. {"in", 2, nifIn2},
  204. {"ins", 2, nifIns2},
  205. {"tryOut", 1, nifTryOut1},
  206. {"tryOuts", 2, nifTryOuts2},
  207. {"size", 1, nifSize1}
  208. };
  209. ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload)