diff --git a/c_src/eLfq/eLfq.cc b/c_src/eLfq/eLfq.cc index 7e19d71..886535c 100644 --- a/c_src/eLfq/eLfq.cc +++ b/c_src/eLfq/eLfq.cc @@ -1,38 +1,29 @@ #include #include "erl_nif.h" #include "concurrentqueue.h" +#include -struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits -{ +struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits { // static const size_t BLOCK_SIZE = 8; // static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 4; // static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4; // static const size_t INITIAL_IMPLCICIT_PRODUCER_HASH_SIZE = 1; // static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 24; - static inline void* malloc(std::size_t size) { return enif_alloc(size); } - static inline void free(void* ptr) { enif_free(ptr); } + static inline void *malloc(std::size_t size) { return enif_alloc(size); } + + static inline void free(void *ptr) { enif_free(ptr); } }; -using lfqIns = moodycamel::ConcurrentQueue *; +using lfqIns = moodycamel::ConcurrentQueue *; ERL_NIF_TERM atomOk; ERL_NIF_TERM atomError; +ERL_NIF_TERM atomNewErr; ERL_NIF_TERM atomTrue; ERL_NIF_TERM atomFalse; ERL_NIF_TERM atomEmpty; -ERL_NIF_TERM make_binary(ErlNifEnv *env, const char *buff, size_t length) { - ERL_NIF_TERM term; - unsigned char *destination_buffer = enif_make_new_binary(env, length, &term); - memcpy(destination_buffer, buff, length); - return term; -} - -ERL_NIF_TERM make_error(ErlNifEnv *env, const char *error) { - return enif_make_tuple2(env, atomError, make_binary(env, error, strlen(error))); -} - void eLfqFree(ErlNifEnv *, void *obj) { lfqIns *ObjIns = static_cast(obj); if (ObjIns != nullptr) { @@ -54,11 +45,11 @@ int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) { *priv_data = ResIns; atomOk = enif_make_atom(env, "ok"); - atomError = enif_make_atom(env, "lfq_error"); atomTrue = enif_make_atom(env, "true"); atomFalse = enif_make_atom(env, "false"); + atomError = enif_make_atom(env, "lfq_error"); atomEmpty = enif_make_atom(env, "lfq_empty"); - + atomNewErr = enif_make_atom(env, "error"); return 0; } @@ -79,17 +70,38 @@ void nifUnload(ErlNifEnv *, void *priv_data) { ERL_NIF_TERM nifNew(ErlNifEnv *env, int, const ERL_NIF_TERM *) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); - lfqIns* ObjIns = static_cast(enif_alloc_resource(ResIns, sizeof(lfqIns))); + lfqIns *ObjIns = static_cast(enif_alloc_resource(ResIns, sizeof(lfqIns))); *ObjIns = new moodycamel::ConcurrentQueue; if (ObjIns == NULL) - return make_error(env, "enif_alloc_resource failed"); + return atomNewErr; ERL_NIF_TERM RefTerm = enif_make_resource(env, ObjIns); enif_release_resource(ObjIns); return enif_make_tuple2(env, atomOk, RefTerm); } +ERL_NIF_TERM nifDel1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { + ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); + + lfqIns *ObjIns = NULL; + + if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) { + return enif_make_badarg(env); + } + + if (NULL != ObjIns) { + ErlNifBinary TermBin; + while ((*ObjIns)->try_dequeue(TermBin)) { + enif_release_binary(&TermBin); + } + + delete ObjIns; + } + + return atomOk; +} + ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); @@ -100,10 +112,10 @@ ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { } ErlNifBinary TermBin; - if(!enif_term_to_binary(env, argv[1], &TermBin)) + if (!enif_term_to_binary(env, argv[1], &TermBin)) return enif_make_badarg(env); - if ((*ObjIns)->enqueue(TermBin)){ + if ((*ObjIns)->enqueue(TermBin)) { return atomTrue; } else { enif_release_binary(&TermBin); @@ -120,20 +132,36 @@ ERL_NIF_TERM nifIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { return enif_make_badarg(env); } - std::vector data; - for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) { - data.push_back(i); + ERL_NIF_TERM List; + ERL_NIF_TERM Head; + + List = argv[1]; + if (!enif_is_list(env, List)) { + return enif_make_badarg(env); } + unsigned ListLen; + enif_get_list_length(env, List, &ListLen); + + std::vector TermBinList; + ErlNifBinary TermBin; - if(!enif_term_to_binary(env, argv[1], &TermBin)) - return enif_make_badarg(env); + while (enif_get_list_cell(env, List, &Head, &List)) { + if (!enif_term_to_binary(env, Head, &TermBin)) { + for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) { + enif_release_binary(&(*it)); + } + return enif_make_badarg(env); + } - if ((*ObjIns)->enqueue_bulk(data.cbegin(), data.size())){ + TermBinList.push_back(TermBin); + } + + if ((*ObjIns)->enqueue_bulk(TermBinList.cbegin(), TermBinList.size())) { return atomTrue; } else { - for(auto it = data.cbegin(), it != data.cend(), it++){ - enif_release_binary(&TermBin); + for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) { + enif_release_binary(&(*it)); } return atomFalse; } @@ -151,12 +179,10 @@ ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { if ((*ObjIns)->try_dequeue(TermBin)) { ERL_NIF_TERM OutTerm; - if(enif_binary_to_term(env, TermBin.data, TermBin.size, &OutTerm, 0) == 0) - { + if (enif_binary_to_term(env, TermBin.data, TermBin.size, &OutTerm, 0) == 0) { enif_release_binary(&TermBin); - // return make_error(env, atomError "failed to decode data"); return atomError; - } else{ + } else { enif_release_binary(&TermBin); return OutTerm; } @@ -166,9 +192,34 @@ ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { } ERL_NIF_TERM nifTryOuts2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomEmpty; -} + ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); + lfqIns *ObjIns = NULL; + if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) { + return enif_make_badarg(env); + } + + unsigned int OutLen; + if (!enif_get_uint(env, argv[1], &OutLen)) { + return enif_make_badarg(env); + } + + std::vector TermBinList(OutLen); + size_t OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size()); + + ERL_NIF_TERM RetList = enif_make_list(env, 0); + + ERL_NIF_TERM OutTerm; + for (int i = OutSize - 1; i >= 0; i--) { + if (enif_binary_to_term(env, TermBinList[i].data, TermBinList[i].size, &OutTerm, 0) == 0) { + enif_release_binary(&TermBinList[i]); + } else { + enif_release_binary(&TermBinList[i]); + RetList = enif_make_list_cell(env, OutTerm, RetList); + } + } + return RetList; +} ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); @@ -179,17 +230,18 @@ ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { } size_t LfqSize = (*ObjIns)->size_approx(); - return enif_make_long(env, (long int)LfqSize); + return enif_make_long(env, (long int) LfqSize); } static ErlNifFunc nifFuncs[] = { - {"new", 0, nifNew}, - {"in", 2, nifIn2}, - {"ins", 2, nifIns2}, - {"tryOut", 1, nifTryOut1}, - {"tryOuts", 2, nifTryOuts2}, - {"size", 1, nifSize1} + {"new", 0, nifNew}, + {"del", 1, nifDel1}, + {"in", 2, nifIn2}, + {"ins", 2, nifIns2}, + {"tryOut", 1, nifTryOut1}, + {"tryOuts", 2, nifTryOuts2}, + {"size", 1, nifSize1} }; ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload) \ No newline at end of file diff --git a/src/eLfq.erl b/src/eLfq.erl index 8587eb8..30c564c 100644 --- a/src/eLfq.erl +++ b/src/eLfq.erl @@ -8,6 +8,9 @@ % create the queue new/0 + % delete the queue + , del/1 + % Allocates more memory if necessary , in/2 % (item) : bool , ins/2 % (item_first, count) : bool @@ -36,26 +39,30 @@ init() -> end, erlang:load_nif(SoName, 0). --spec new() -> {ok, QueueRef :: reference()} | badarg | {error, Reason :: binary()}. +-spec new() -> {ok, QueueRef :: reference()} | error. new() -> ?NotLoaded. --spec in(QueueRef :: reference(), Data :: any()) -> true | {error, Reason :: binary()}. +-spec del(QueueRef :: reference()) -> ok. +del(_QueueRef) -> + ?NotLoaded. + +-spec in(QueueRef :: reference(), Data :: any()) -> true | false. in(_QueueRef, _Data) -> ?NotLoaded. --spec ins(QueueRef :: reference(), DataList :: [any()]) -> true | {error, Reason :: binary()}. +-spec ins(QueueRef :: reference(), DataList :: [any()]) -> true | false. ins(_QueueRef, _DataList) -> ?NotLoaded. --spec tryOut(QueueRef :: reference()) -> Data :: any() | {error, Reason :: binary()}. +-spec tryOut(QueueRef :: reference()) -> Data :: any() | lfq_empty | lfq_error. tryOut(_QueueRef) -> ?NotLoaded. --spec tryOuts(QueueRef :: reference(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}. +-spec tryOuts(QueueRef :: reference(), Cnt :: pos_integer()) -> DataList :: [any()]. tryOuts(_QueueRef, _Cnt) -> ?NotLoaded. --spec size(QueueRef :: reference()) -> pos_integer() | {error, Reason :: binary()}. +-spec size(QueueRef :: reference()) -> pos_integer(). size(_QueueRef) -> - ?NotLoaded. + ?NotLoaded. \ No newline at end of file