Ver a proveniência

ft: 完善

master
SisMaker há 3 anos
ascendente
cometimento
fcc6fe3070
2 ficheiros alterados com 109 adições e 50 eliminações
  1. +95
    -43
      c_src/eLfq/eLfq.cc
  2. +14
    -7
      src/eLfq.erl

+ 95
- 43
c_src/eLfq/eLfq.cc Ver ficheiro

@ -1,38 +1,29 @@
#include <string.h>
#include "erl_nif.h"
#include "concurrentqueue.h"
#include <vector>
struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits {
// static const size_t BLOCK_SIZE = 8;
// static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 4;
// static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4;
// static const size_t INITIAL_IMPLCICIT_PRODUCER_HASH_SIZE = 1;
// static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 24;
static inline void* malloc(std::size_t size) { return enif_alloc(size); }
static inline void free(void* ptr) { enif_free(ptr); }
static inline void *malloc(std::size_t size) { return enif_alloc(size); }
static inline void free(void *ptr) { enif_free(ptr); }
};
using lfqIns = moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits> *;
using lfqIns = moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits> *;
ERL_NIF_TERM atomOk;
ERL_NIF_TERM atomError;
ERL_NIF_TERM atomNewErr;
ERL_NIF_TERM atomTrue;
ERL_NIF_TERM atomFalse;
ERL_NIF_TERM atomEmpty;
ERL_NIF_TERM make_binary(ErlNifEnv *env, const char *buff, size_t length) {
ERL_NIF_TERM term;
unsigned char *destination_buffer = enif_make_new_binary(env, length, &term);
memcpy(destination_buffer, buff, length);
return term;
}
ERL_NIF_TERM make_error(ErlNifEnv *env, const char *error) {
return enif_make_tuple2(env, atomError, make_binary(env, error, strlen(error)));
}
void eLfqFree(ErlNifEnv *, void *obj) {
lfqIns *ObjIns = static_cast<lfqIns *>(obj);
if (ObjIns != nullptr) {
@ -54,11 +45,11 @@ int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) {
*priv_data = ResIns;
atomOk = enif_make_atom(env, "ok");
atomError = enif_make_atom(env, "lfq_error");
atomTrue = enif_make_atom(env, "true");
atomFalse = enif_make_atom(env, "false");
atomError = enif_make_atom(env, "lfq_error");
atomEmpty = enif_make_atom(env, "lfq_empty");
atomNewErr = enif_make_atom(env, "error");
return 0;
}
@ -79,17 +70,38 @@ void nifUnload(ErlNifEnv *, void *priv_data) {
ERL_NIF_TERM nifNew(ErlNifEnv *env, int, const ERL_NIF_TERM *) {
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
lfqIns* ObjIns = static_cast<lfqIns *>(enif_alloc_resource(ResIns, sizeof(lfqIns)));
lfqIns *ObjIns = static_cast<lfqIns *>(enif_alloc_resource(ResIns, sizeof(lfqIns)));
*ObjIns = new moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits>;
if (ObjIns == NULL)
return make_error(env, "enif_alloc_resource failed");
return atomNewErr;
ERL_NIF_TERM RefTerm = enif_make_resource(env, ObjIns);
enif_release_resource(ObjIns);
return enif_make_tuple2(env, atomOk, RefTerm);
}
ERL_NIF_TERM nifDel1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
lfqIns *ObjIns = NULL;
if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) {
return enif_make_badarg(env);
}
if (NULL != ObjIns) {
ErlNifBinary TermBin;
while ((*ObjIns)->try_dequeue(TermBin)) {
enif_release_binary(&TermBin);
}
delete ObjIns;
}
return atomOk;
}
ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
@ -100,10 +112,10 @@ ERL_NIF_TERM nifIn2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
}
ErlNifBinary TermBin;
if(!enif_term_to_binary(env, argv[1], &TermBin))
if (!enif_term_to_binary(env, argv[1], &TermBin))
return enif_make_badarg(env);
if ((*ObjIns)->enqueue(TermBin)){
if ((*ObjIns)->enqueue(TermBin)) {
return atomTrue;
} else {
enif_release_binary(&TermBin);
@ -120,20 +132,36 @@ ERL_NIF_TERM nifIns2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return enif_make_badarg(env);
}
std::vector<ErlNifBinary> data;
for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
data.push_back(i);
ERL_NIF_TERM List;
ERL_NIF_TERM Head;
List = argv[1];
if (!enif_is_list(env, List)) {
return enif_make_badarg(env);
}
unsigned ListLen;
enif_get_list_length(env, List, &ListLen);
std::vector <ErlNifBinary> TermBinList;
ErlNifBinary TermBin;
if(!enif_term_to_binary(env, argv[1], &TermBin))
return enif_make_badarg(env);
while (enif_get_list_cell(env, List, &Head, &List)) {
if (!enif_term_to_binary(env, Head, &TermBin)) {
for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) {
enif_release_binary(&(*it));
}
return enif_make_badarg(env);
}
if ((*ObjIns)->enqueue_bulk(data.cbegin(), data.size())){
TermBinList.push_back(TermBin);
}
if ((*ObjIns)->enqueue_bulk(TermBinList.cbegin(), TermBinList.size())) {
return atomTrue;
} else {
for(auto it = data.cbegin(), it != data.cend(), it++){
enif_release_binary(&TermBin);
for (auto it = TermBinList.begin(); it != TermBinList.end(); ++it) {
enif_release_binary(&(*it));
}
return atomFalse;
}
@ -151,12 +179,10 @@ ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
if ((*ObjIns)->try_dequeue(TermBin)) {
ERL_NIF_TERM OutTerm;
if(enif_binary_to_term(env, TermBin.data, TermBin.size, &OutTerm, 0) == 0)
{
if (enif_binary_to_term(env, TermBin.data, TermBin.size, &OutTerm, 0) == 0) {
enif_release_binary(&TermBin);
// return make_error(env, atomError "failed to decode data");
return atomError;
} else{
} else {
enif_release_binary(&TermBin);
return OutTerm;
}
@ -166,9 +192,34 @@ ERL_NIF_TERM nifTryOut1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
}
ERL_NIF_TERM nifTryOuts2(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atomEmpty;
}
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
lfqIns *ObjIns = NULL;
if (!enif_get_resource(env, argv[0], ResIns, (void **) &ObjIns)) {
return enif_make_badarg(env);
}
unsigned int OutLen;
if (!enif_get_uint(env, argv[1], &OutLen)) {
return enif_make_badarg(env);
}
std::vector <ErlNifBinary> TermBinList(OutLen);
size_t OutSize = (*ObjIns)->try_dequeue_bulk(TermBinList.begin(), TermBinList.size());
ERL_NIF_TERM RetList = enif_make_list(env, 0);
ERL_NIF_TERM OutTerm;
for (int i = OutSize - 1; i >= 0; i--) {
if (enif_binary_to_term(env, TermBinList[i].data, TermBinList[i].size, &OutTerm, 0) == 0) {
enif_release_binary(&TermBinList[i]);
} else {
enif_release_binary(&TermBinList[i]);
RetList = enif_make_list_cell(env, OutTerm, RetList);
}
}
return RetList;
}
ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
ErlNifResourceType *ResIns = static_cast<ErlNifResourceType *>(enif_priv_data(env));
@ -179,17 +230,18 @@ ERL_NIF_TERM nifSize1(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
}
size_t LfqSize = (*ObjIns)->size_approx();
return enif_make_long(env, (long int)LfqSize);
return enif_make_long(env, (long int) LfqSize);
}
static ErlNifFunc nifFuncs[] =
{
{"new", 0, nifNew},
{"in", 2, nifIn2},
{"ins", 2, nifIns2},
{"tryOut", 1, nifTryOut1},
{"tryOuts", 2, nifTryOuts2},
{"size", 1, nifSize1}
{"new", 0, nifNew},
{"del", 1, nifDel1},
{"in", 2, nifIn2},
{"ins", 2, nifIns2},
{"tryOut", 1, nifTryOut1},
{"tryOuts", 2, nifTryOuts2},
{"size", 1, nifSize1}
};
ERL_NIF_INIT(eLfq, nifFuncs, nifLoad, NULL, nifUpgrade, nifUnload)

+ 14
- 7
src/eLfq.erl Ver ficheiro

@ -8,6 +8,9 @@
% create the queue
new/0
% delete the queue
, del/1
% Allocates more memory if necessary
, in/2 % (item) : bool
, ins/2 % (item_first, count) : bool
@ -36,26 +39,30 @@ init() ->
end,
erlang:load_nif(SoName, 0).
-spec new() -> {ok, QueueRef :: reference()} | badarg | {error, Reason :: binary()}.
-spec new() -> {ok, QueueRef :: reference()} | error.
new() ->
?NotLoaded.
-spec in(QueueRef :: reference(), Data :: any()) -> true | {error, Reason :: binary()}.
-spec del(QueueRef :: reference()) -> ok.
del(_QueueRef) ->
?NotLoaded.
-spec in(QueueRef :: reference(), Data :: any()) -> true | false.
in(_QueueRef, _Data) ->
?NotLoaded.
-spec ins(QueueRef :: reference(), DataList :: [any()]) -> true | {error, Reason :: binary()}.
-spec ins(QueueRef :: reference(), DataList :: [any()]) -> true | false.
ins(_QueueRef, _DataList) ->
?NotLoaded.
-spec tryOut(QueueRef :: reference()) -> Data :: any() | {error, Reason :: binary()}.
-spec tryOut(QueueRef :: reference()) -> Data :: any() | lfq_empty | lfq_error.
tryOut(_QueueRef) ->
?NotLoaded.
-spec tryOuts(QueueRef :: reference(), Cnt :: pos_integer()) -> DataList :: [any()] | {error, Reason :: binary()}.
-spec tryOuts(QueueRef :: reference(), Cnt :: pos_integer()) -> DataList :: [any()].
tryOuts(_QueueRef, _Cnt) ->
?NotLoaded.
-spec size(QueueRef :: reference()) -> pos_integer() | {error, Reason :: binary()}.
-spec size(QueueRef :: reference()) -> pos_integer().
size(_QueueRef) ->
?NotLoaded.
?NotLoaded.

Carregando…
Cancelar
Guardar