From 1cdbe5424f28a833077bbc148cda7c8dc56c15d1 Mon Sep 17 00:00:00 2001 From: SisMaker <156736github> Date: Tue, 25 Jan 2022 23:40:57 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20eLfq.cc=20=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- c_src/eLfq/eLfq.cc | 105 +++++++++++++++++++++------------------------ src/eLfq.erl | 54 ----------------------- 2 files changed, 49 insertions(+), 110 deletions(-) diff --git a/c_src/eLfq/eLfq.cc b/c_src/eLfq/eLfq.cc index dd14569..7e19d71 100644 --- a/c_src/eLfq/eLfq.cc +++ b/c_src/eLfq/eLfq.cc @@ -2,7 +2,19 @@ #include "erl_nif.h" #include "concurrentqueue.h" -using lfqIns = moodycamel::ConcurrentQueue *; +struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits +{ + // static const size_t BLOCK_SIZE = 8; + // static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 4; + // static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4; + // static const size_t INITIAL_IMPLCICIT_PRODUCER_HASH_SIZE = 1; + // static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 24; + + static inline void* malloc(std::size_t size) { return enif_alloc(size); } + static inline void free(void* ptr) { enif_free(ptr); } +}; + +using lfqIns = moodycamel::ConcurrentQueue *; ERL_NIF_TERM atomOk; ERL_NIF_TERM atomError; @@ -35,24 +47,24 @@ void eLfqFree(ErlNifEnv *, void *obj) { int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) { ErlNifResourceFlags flags = static_cast(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); - ErlNifResourceType *ResIns; + ErlNifResourceType *ResIns = NULL; ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL); if (NULL == ResIns) return -1; *priv_data = ResIns; atomOk = enif_make_atom(env, "ok"); - atomError = enif_make_atom(env, "error"); + atomError = enif_make_atom(env, "lfq_error"); atomTrue = enif_make_atom(env, "true"); atomFalse = enif_make_atom(env, "false"); - atomEmpty = enif_make_atom(env, "empty"); + atomEmpty = enif_make_atom(env, "lfq_empty"); return 0; } int nifUpgrade(ErlNifEnv *env, void **priv_data, void **, ERL_NIF_TERM) { ErlNifResourceFlags flags = static_cast(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); - ErlNifResourceType *ResIns; + ErlNifResourceType *ResIns = NULL; ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL); if (NULL == ResIns) return -1; @@ -68,13 +80,13 @@ ERL_NIF_TERM nifNew(ErlNifEnv *env, int, const ERL_NIF_TERM *) { ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); lfqIns* ObjIns = static_cast(enif_alloc_resource(ResIns, sizeof(lfqIns))); - *ObjIns = new moodycamel::ConcurrentQueue; + *ObjIns = new moodycamel::ConcurrentQueue; if (ObjIns == NULL) return make_error(env, "enif_alloc_resource failed"); ERL_NIF_TERM RefTerm = enif_make_resource(env, ObjIns); - enif_release_resource(ResIns); + enif_release_resource(ObjIns); return enif_make_tuple2(env, atomOk, RefTerm); } @@ -99,32 +111,32 @@ ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { } } -ERL_NIF_TERM nifIn3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; -} - ERL_NIF_TERM nifIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; -} + ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); -ERL_NIF_TERM nifIns3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; -} + lfqIns *ObjIns = NULL; -ERL_NIF_TERM nifTryIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; -} + if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) { + return enif_make_badarg(env); + } -ERL_NIF_TERM nifTryIn3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; -} + std::vector data; + for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) { + data.push_back(i); + } -ERL_NIF_TERM nifTryIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; -} + ErlNifBinary TermBin; + if(!enif_term_to_binary(env, argv[1], &TermBin)) + return enif_make_badarg(env); -ERL_NIF_TERM nifTryIns3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomTrue; + if ((*ObjIns)->enqueue_bulk(data.cbegin(), data.size())){ + return atomTrue; + } else { + for(auto it = data.cbegin(), it != data.cend(), it++){ + enif_release_binary(&TermBin); + } + return atomFalse; + } } ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { @@ -146,57 +158,38 @@ ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { return atomError; } else{ enif_release_binary(&TermBin); - return enif_make_tuple2(env, atomOk, OutTerm); + return OutTerm; } } else { return atomEmpty; } } -ERL_NIF_TERM nifTryOut2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomEmpty; -} - ERL_NIF_TERM nifTryOuts2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { return atomEmpty; } -ERL_NIF_TERM nifTryOuts3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomEmpty; -} -ERL_NIF_TERM nifTryOutByProd2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomEmpty; -} +ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { + ErlNifResourceType *ResIns = static_cast(enif_priv_data(env)); + lfqIns *ObjIns = NULL; -ERL_NIF_TERM nifTryOutByProd3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomEmpty; -} + if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) { + return enif_make_badarg(env); + } -ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { - return atomEmpty; + size_t LfqSize = (*ObjIns)->size_approx(); + return enif_make_long(env, (long int)LfqSize); } - static ErlNifFunc nifFuncs[] = { {"new", 0, nifNew}, {"in", 2, nifIn2}, - {"in", 3, nifIn3}, {"ins", 2, nifIns2}, - {"ins", 3, nifIns3}, - {"tryIn", 2, nifTryIn2}, - {"tryIn", 3, nifTryIn3}, - {"tryIns", 2, nifTryIns2}, - {"tryIns", 3, nifTryIns3}, {"tryOut", 1, nifTryOut1}, - {"tryOut", 2, nifTryOut2}, {"tryOuts", 2, nifTryOuts2}, - {"tryOuts", 3, nifTryOuts3}, - {"tryOutByProd", 2, nifTryOutByProd2}, - {"tryOutsByProd", 3, nifTryOutByProd3}, {"size", 1, nifSize1} }; -ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload -) \ No newline at end of file +ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload) \ No newline at end of file diff --git a/src/eLfq.erl b/src/eLfq.erl index 97dbdff..8587eb8 100644 --- a/src/eLfq.erl +++ b/src/eLfq.erl @@ -10,25 +10,11 @@ % Allocates more memory if necessary , in/2 % (item) : bool - , in/3 % (prod_token, item) : bool , ins/2 % (item_first, count) : bool - , ins/3 % (prod_token, item_first, count) : bool - - % Fails if not enough memory to enqueue - , tryIn/2 % (item) : bool - , tryIn/3 % (prod_token, item) : bool - , tryIns/2 % (item_first, count) : bool - , tryIns/3 % (prod_token, item_first, count) : bool % Attempts to dequeue from the queue (never allocates) , tryOut/1 % (item&) : bool - , tryOut/2 % (cons_token, item&) : bool , tryOuts/2 % (item_first, max) : size_t - , tryOuts/3 % (cons_token, item_first, max) : size_t - - % If you happen to know which producer you want to dequeue from - , tryOutByProd/2 % (prod_token, item&) : bool - , tryOutsByProd/3 % (prod_token, item_first, max) : size_t % A not-necessarily-accurate count of the total number of elements , size/1 @@ -58,58 +44,18 @@ new() -> in(_QueueRef, _Data) -> ?NotLoaded. --spec in(QueueRef :: reference(), ProdToken :: any(), Data :: any()) -> true | {error, Reason :: binary()}. -in(_QueueRef, _ProdToken, _Data) -> - ?NotLoaded. - -spec ins(QueueRef :: reference(), DataList :: [any()]) -> true | {error, Reason :: binary()}. ins(_QueueRef, _DataList) -> ?NotLoaded. --spec ins(QueueRef :: reference(), ProdToken :: any(), DataList :: [any()]) -> true | {error, Reason :: binary()}. -ins(_QueueRef, _ProdToken, _DataList) -> - ?NotLoaded. - --spec tryIn(QueueRef :: reference(), Data :: any()) -> true | {error, Reason :: binary()}. -tryIn(_QueueRef, _Data) -> - ?NotLoaded. - --spec tryIn(QueueRef :: reference(), ProdToken :: any(), Data :: any()) -> true | {error, Reason :: binary()}. -tryIn(_QueueRef, _ProdToken, _Data) -> - ?NotLoaded. - --spec tryIns(QueueRef :: reference(), DataList :: [any()]) -> true | {error, Reason :: binary()}. -tryIns(_QueueRef, _DataList) -> - ?NotLoaded. - --spec tryIns(QueueRef :: reference(), ProdToken :: any(), DataList :: [any()]) -> true | {error, Reason :: binary()}. -tryIns(_QueueRef, _ProdToken, _DataList) -> - ?NotLoaded. - -spec tryOut(QueueRef :: reference()) -> Data :: any() | {error, Reason :: binary()}. tryOut(_QueueRef) -> ?NotLoaded. --spec tryOut(QueueRef :: reference(), ConsToken :: any()) -> Data :: any() | {error, Reason :: binary()}. -tryOut(_QueueRef, _ConsToken) -> - ?NotLoaded. - -spec tryOuts(QueueRef :: reference(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}. tryOuts(_QueueRef, _Cnt) -> ?NotLoaded. --spec tryOuts(QueueRef :: reference(), ConsToken :: any(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}. -tryOuts(_QueueRef, _ConsToken, _Cnt) -> - ?NotLoaded. - --spec tryOutByProd(QueueRef :: reference(), ProdToken :: any()) -> Data :: any() | {error, Reason :: binary()}. -tryOutByProd(_QueueRef, _ProdToken) -> - ?NotLoaded. - --spec tryOutsByProd(QueueRef :: reference(), ProdToken :: any(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}. -tryOutsByProd(_QueueRef, _ProdToken, _Cnt) -> - ?NotLoaded. - -spec size(QueueRef :: reference()) -> pos_integer() | {error, Reason :: binary()}. size(_QueueRef) -> ?NotLoaded.