浏览代码

ft: eLfq.cc 修改

master
SisMaker 3 年前
父节点
当前提交
1cdbe5424f
共有 2 个文件被更改,包括 49 次插入110 次删除
  1. +49
    -56
      c_src/eLfq/eLfq.cc
  2. +0
    -54
      src/eLfq.erl

+ 49
- 56
c_src/eLfq/eLfq.cc 查看文件

@ -2,7 +2,19 @@
#include "erl_nif.h"
#include "concurrentqueue.h"
using lfqIns = moodycamel::ConcurrentQueue<ErlNifBinary> *;
struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
// static const size_t BLOCK_SIZE = 8;
// static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 4;
// static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4;
// static const size_t INITIAL_IMPLCICIT_PRODUCER_HASH_SIZE = 1;
// static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 24;
static inline void* malloc(std::size_t size) { return enif_alloc(size); }
static inline void free(void* ptr) { enif_free(ptr); }
};
using lfqIns = moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits> *;
ERL_NIF_TERM atomOk;
ERL_NIF_TERM atomError;
@ -35,24 +47,24 @@ void eLfqFree(ErlNifEnv *, void *obj) {
int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) {
ErlNifResourceFlags flags = static_cast<ErlNifResourceFlags>(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER);
ErlNifResourceType *ResIns;
ErlNifResourceType *ResIns = NULL;
ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL);
if (NULL == ResIns)
return -1;
*priv_data = ResIns;
atomOk = enif_make_atom(env, "ok");
atomError = enif_make_atom(env, "error");
atomError = enif_make_atom(env, "lfq_error");
atomTrue = enif_make_atom(env, "true");
atomFalse = enif_make_atom(env, "false");
atomEmpty = enif_make_atom(env, "empty");
atomEmpty = enif_make_atom(env, "lfq_empty");
return 0;
}
int nifUpgrade(ErlNifEnv *env, void **priv_data, void **, ERL_NIF_TERM) {
ErlNifResourceFlags flags = static_cast<ErlNifResourceFlags>(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER);
ErlNifResourceType *ResIns;
ErlNifResourceType *ResIns = NULL;
ResIns = enif_open_resource_type(env, NULL, "eLfqRes", eLfqFree, flags, NULL);
if (NULL == ResIns)
return -1;
@ -68,13 +80,13 @@ ERL_NIF_TERM nifNew(ErlNifEnv *env, int, const ERL_NIF_TERM *) {
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
lfqIns* ObjIns = static_cast<lfqIns *>(enif_alloc_resource(ResIns, sizeof(lfqIns)));
*ObjIns = new moodycamel::ConcurrentQueue<ErlNifBinary>;
*ObjIns = new moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits>;
if (ObjIns == NULL)
return make_error(env, "enif_alloc_resource failed");
ERL_NIF_TERM RefTerm = enif_make_resource(env, ObjIns);
enif_release_resource(ResIns);
enif_release_resource(ObjIns);
return enif_make_tuple2(env, atomOk, RefTerm);
}
@ -99,32 +111,32 @@ ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
}
}
ERL_NIF_TERM nifIn3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
}
ERL_NIF_TERM nifIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
}
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
ERL_NIF_TERM nifIns3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
}
lfqIns *ObjIns = NULL;
ERL_NIF_TERM nifTryIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
}
if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) {
return enif_make_badarg(env);
}
ERL_NIF_TERM nifTryIn3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
}
std::vector<ErlNifBinary> data;
for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
data.push_back(i);
}
ERL_NIF_TERM nifTryIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
}
ErlNifBinary TermBin;
if(!enif_term_to_binary(env, argv[1], &TermBin))
return enif_make_badarg(env);
ERL_NIF_TERM nifTryIns3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomTrue;
if ((*ObjIns)->enqueue_bulk(data.cbegin(), data.size())){
return atomTrue;
} else {
for(auto it = data.cbegin(), it != data.cend(), it++){
enif_release_binary(&TermBin);
}
return atomFalse;
}
}
ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
@ -146,57 +158,38 @@ ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomError;
} else{
enif_release_binary(&TermBin);
return enif_make_tuple2(env, atomOk, OutTerm);
return OutTerm;
}
} else {
return atomEmpty;
}
}
ERL_NIF_TERM nifTryOut2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
}
ERL_NIF_TERM nifTryOuts2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
}
ERL_NIF_TERM nifTryOuts3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
}
ERL_NIF_TERM nifTryOutByProd2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
}
ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
lfqIns *ObjIns = NULL;
ERL_NIF_TERM nifTryOutByProd3(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
}
if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) {
return enif_make_badarg(env);
}
ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
size_t LfqSize = (*ObjIns)->size_approx();
return enif_make_long(env, (long int)LfqSize);
}
static ErlNifFunc nifFuncs[] =
{
{"new", 0, nifNew},
{"in", 2, nifIn2},
{"in", 3, nifIn3},
{"ins", 2, nifIns2},
{"ins", 3, nifIns3},
{"tryIn", 2, nifTryIn2},
{"tryIn", 3, nifTryIn3},
{"tryIns", 2, nifTryIns2},
{"tryIns", 3, nifTryIns3},
{"tryOut", 1, nifTryOut1},
{"tryOut", 2, nifTryOut2},
{"tryOuts", 2, nifTryOuts2},
{"tryOuts", 3, nifTryOuts3},
{"tryOutByProd", 2, nifTryOutByProd2},
{"tryOutsByProd", 3, nifTryOutByProd3},
{"size", 1, nifSize1}
};
ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload
)
ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload)

+ 0
- 54
src/eLfq.erl 查看文件

@ -10,25 +10,11 @@
% Allocates more memory if necessary
, in/2 % (item) : bool
, in/3 % (prod_token, item) : bool
, ins/2 % (item_first, count) : bool
, ins/3 % (prod_token, item_first, count) : bool
% Fails if not enough memory to enqueue
, tryIn/2 % (item) : bool
, tryIn/3 % (prod_token, item) : bool
, tryIns/2 % (item_first, count) : bool
, tryIns/3 % (prod_token, item_first, count) : bool
% Attempts to dequeue from the queue (never allocates)
, tryOut/1 % (item&) : bool
, tryOut/2 % (cons_token, item&) : bool
, tryOuts/2 % (item_first, max) : size_t
, tryOuts/3 % (cons_token, item_first, max) : size_t
% If you happen to know which producer you want to dequeue from
, tryOutByProd/2 % (prod_token, item&) : bool
, tryOutsByProd/3 % (prod_token, item_first, max) : size_t
% A not-necessarily-accurate count of the total number of elements
, size/1
@ -58,58 +44,18 @@ new() ->
in(_QueueRef, _Data) ->
?NotLoaded.
-spec in(QueueRef :: reference(), ProdToken :: any(), Data :: any()) -> true | {error, Reason :: binary()}.
in(_QueueRef, _ProdToken, _Data) ->
?NotLoaded.
-spec ins(QueueRef :: reference(), DataList :: [any()]) -> true | {error, Reason :: binary()}.
ins(_QueueRef, _DataList) ->
?NotLoaded.
-spec ins(QueueRef :: reference(), ProdToken :: any(), DataList :: [any()]) -> true | {error, Reason :: binary()}.
ins(_QueueRef, _ProdToken, _DataList) ->
?NotLoaded.
-spec tryIn(QueueRef :: reference(), Data :: any()) -> true | {error, Reason :: binary()}.
tryIn(_QueueRef, _Data) ->
?NotLoaded.
-spec tryIn(QueueRef :: reference(), ProdToken :: any(), Data :: any()) -> true | {error, Reason :: binary()}.
tryIn(_QueueRef, _ProdToken, _Data) ->
?NotLoaded.
-spec tryIns(QueueRef :: reference(), DataList :: [any()]) -> true | {error, Reason :: binary()}.
tryIns(_QueueRef, _DataList) ->
?NotLoaded.
-spec tryIns(QueueRef :: reference(), ProdToken :: any(), DataList :: [any()]) -> true | {error, Reason :: binary()}.
tryIns(_QueueRef, _ProdToken, _DataList) ->
?NotLoaded.
-spec tryOut(QueueRef :: reference()) -> Data :: any() | {error, Reason :: binary()}.
tryOut(_QueueRef) ->
?NotLoaded.
-spec tryOut(QueueRef :: reference(), ConsToken :: any()) -> Data :: any() | {error, Reason :: binary()}.
tryOut(_QueueRef, _ConsToken) ->
?NotLoaded.
-spec tryOuts(QueueRef :: reference(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}.
tryOuts(_QueueRef, _Cnt) ->
?NotLoaded.
-spec tryOuts(QueueRef :: reference(), ConsToken :: any(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}.
tryOuts(_QueueRef, _ConsToken, _Cnt) ->
?NotLoaded.
-spec tryOutByProd(QueueRef :: reference(), ProdToken :: any()) -> Data :: any() | {error, Reason :: binary()}.
tryOutByProd(_QueueRef, _ProdToken) ->
?NotLoaded.
-spec tryOutsByProd(QueueRef :: reference(), ProdToken :: any(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}.
tryOutsByProd(_QueueRef, _ProdToken, _Cnt) ->
?NotLoaded.
-spec size(QueueRef :: reference()) -> pos_integer() | {error, Reason :: binary()}.
size(_QueueRef) ->
?NotLoaded.

正在加载...
取消
保存