#include #include "erl_nif.h" #include "concurrentqueue.h" #include struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits { static const size_t BLOCK_SIZE = 16; static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 16; static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 8; static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 8; static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 16; static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256; static inline void *malloc(std::size_t size) { return enif_alloc(size); } static inline void free(void *ptr) { enif_free(ptr); } }; using lfqIns = moodycamel::ConcurrentQueue *; const size_t BulkDelCnt = 200; ERL_NIF_TERM atomOk; ERL_NIF_TERM atomError; ERL_NIF_TERM atomNewErr; ERL_NIF_TERM atomTrue; ERL_NIF_TERM atomFalse; ERL_NIF_TERM atomEmpty; void eLfqFree(ErlNifEnv *, void *obj) { lfqIns *ObjIns = static_cast(obj); if (NULL != ObjIns && NULL != *ObjIns) { std::vector TermBinList(BulkDelCnt); size_t OutSize; do{ OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size()); for (int i = OutSize - 1; i >= 0; --i) { enif_release_binary(&TermBinList[i]); } }while(OutSize >= BulkDelCnt); delete (*ObjIns); *ObjIns = NULL; } } int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) { ErlNifResourceFlags flags = static_cast(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); ErlNifResourceType *ResIns = NULL; ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL); if (NULL == ResIns) return -1; *priv_data = ResIns; atomOk = enif_make_atom(env, "ok"); atomTrue = enif_make_atom(env, "true"); atomFalse = enif_make_atom(env, "false"); atomError = enif_make_atom(env, "lfq_error"); atomEmpty = enif_make_atom(env, "lfq_empty"); atomNewErr = enif_make_atom(env, "error"); return 0; } int nifUpgrade(ErlNifEnv *env, void **priv_data, void **, ERL_NIF_TERM) { ErlNifResourceFlags flags = static_cast(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); ErlNifResourceType *ResIns = NULL; ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL); if (NULL == ResIns) return -1; *priv_data = ResIns; return 0; } void nifUnload(ErlNifEnv *, void *priv_data) { } ERL_NIF_TERM nifNew(ErlNifEnv *env, int, const ERL_NIF_TERM *) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = static_cast(enif_alloc_resource(ResIns, sizeof(lfqIns))); if (ObjIns == NULL) return atomNewErr; *ObjIns = new moodycamel::ConcurrentQueue; if (*ObjIns == NULL) return atomNewErr; ERL_NIF_TERM RefTerm = enif_make_resource(env, ObjIns); enif_release_resource(ObjIns); return enif_make_tuple2(env, atomOk, RefTerm); } ERL_NIF_TERM nifDel1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = NULL; if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) { return enif_make_badarg(env); } if (NULL != ObjIns && NULL != *ObjIns) { std::vector TermBinList(BulkDelCnt); size_t OutSize; do{ OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size()); for (int i = OutSize - 1; i >= 0; --i) { enif_release_binary(&TermBinList[i]); } }while(OutSize >= BulkDelCnt); delete (*ObjIns); *ObjIns = NULL; } return atomOk; } ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = NULL; if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) { return enif_make_badarg(env); } ErlNifBinary TermBin; if (!enif_term_to_binary(env, argv[1], &TermBin)) return enif_make_badarg(env); if ((*ObjIns)->enqueue(TermBin)) { return atomTrue; } else { enif_release_binary(&TermBin); return atomFalse; } } ERL_NIF_TERM nifIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = NULL; if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) { return enif_make_badarg(env); } ERL_NIF_TERM List; ERL_NIF_TERM Head; List = argv[1]; if (!enif_is_list(env, List)) { return enif_make_badarg(env); } unsigned ListLen; enif_get_list_length(env, List, &ListLen); std::vector TermBinList; ErlNifBinary TermBin; while (enif_get_list_cell(env, List, &Head, &List)) { if (!enif_term_to_binary(env, Head, &TermBin)) { for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) { enif_release_binary(&(*it)); } return enif_make_badarg(env); } TermBinList.push_back(TermBin); } if ((*ObjIns)->enqueue_bulk(TermBinList.cbegin(), TermBinList.size())) { return atomTrue; } else { for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) { enif_release_binary(&(*it)); } return atomFalse; } } ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = NULL; if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) { return enif_make_badarg(env); } ErlNifBinary TermBin; if ((*ObjIns)->try_dequeue(TermBin)) { ERL_NIF_TERM OutTerm; if (enif_binary_to_term(env, TermBin.data, TermBin.size, &OutTerm, 0) == 0) { enif_release_binary(&TermBin); return atomError; } else { enif_release_binary(&TermBin); return OutTerm; } } else { return atomEmpty; } } ERL_NIF_TERM nifTryOuts2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = NULL; if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) { return enif_make_badarg(env); } unsigned int OutLen; if (!enif_get_uint(env, argv[1], &OutLen)) { return enif_make_badarg(env); } std::vector TermBinList(OutLen); size_t OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size()); ERL_NIF_TERM RetList = enif_make_list(env, 0); ERL_NIF_TERM OutTerm; for (int i = OutSize - 1; i >= 0; --i) { if (enif_binary_to_term(env, TermBinList[i].data, TermBinList[i].size, &OutTerm, 0) == 0) { enif_release_binary(&TermBinList[i]); } else { enif_release_binary(&TermBinList[i]); RetList = enif_make_list_cell(env, OutTerm, RetList); } } return RetList; } ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns *ObjIns = NULL; if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns) || NULL == *ObjIns) { return enif_make_badarg(env); } size_t LfqSize = (*ObjIns)->size_approx(); return enif_make_long(env, (long int) LfqSize); } static ErlNifFunc nifFuncs[] = { {"new", 0, nifNew}, {"del", 1, nifDel1}, {"in", 2, nifIn2}, {"ins", 2, nifIns2}, {"tryOut", 1, nifTryOut1}, {"tryOuts", 2, nifTryOuts2}, {"size", 1, nifSize1} }; ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload)