@ -0,0 +1,80 @@ | |||||
PROJECT = enlfq | |||||
CXXFLAGS = -std=c++11 -O2 -Wextra -Werror -Wno-missing-field-initializers -fno-rtti -fno-exceptions | |||||
LDLIBS = -lstdc++ | |||||
# Based on c_src.mk from erlang.mk by Loic Hoguin <essen@ninenines.eu> | |||||
CURDIR := $(shell pwd) | |||||
BASEDIR := $(abspath $(CURDIR)/..) | |||||
PROJECT ?= $(notdir $(BASEDIR)) | |||||
PROJECT := $(strip $(PROJECT)) | |||||
ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~ts/erts-~ts/include/\", [code:root_dir(), erlang:system_info(version)]).") | |||||
ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~ts\", [code:lib_dir(erl_interface, include)]).") | |||||
ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~ts\", [code:lib_dir(erl_interface, lib)]).") | |||||
C_SRC_DIR = $(CURDIR) | |||||
C_SRC_OUTPUT ?= $(CURDIR)/../priv/$(PROJECT).so | |||||
# System type and C compiler/flags. | |||||
UNAME_SYS := $(shell uname -s) | |||||
ifeq ($(UNAME_SYS), Darwin) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall | |||||
LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress | |||||
else ifeq ($(UNAME_SYS), FreeBSD) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
else ifeq ($(UNAME_SYS), Linux) | |||||
CC ?= gcc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
endif | |||||
CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei | |||||
LDFLAGS += -shared | |||||
# Verbosity. | |||||
c_verbose_0 = @echo " C " $(?F); | |||||
c_verbose = $(c_verbose_$(V)) | |||||
cpp_verbose_0 = @echo " CPP " $(?F); | |||||
cpp_verbose = $(cpp_verbose_$(V)) | |||||
link_verbose_0 = @echo " LD " $(@F); | |||||
link_verbose = $(link_verbose_$(V)) | |||||
SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) | |||||
OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) | |||||
COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c | |||||
COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c | |||||
$(C_SRC_OUTPUT): $(OBJECTS) | |||||
@mkdir -p $(BASEDIR)/priv/ | |||||
$(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) | |||||
%.o: %.c | |||||
$(COMPILE_C) $(OUTPUT_OPTION) $< | |||||
%.o: %.cc | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.C | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.cpp | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
clean: | |||||
@rm -f $(C_SRC_OUTPUT) $(OBJECTS) |
@ -0,0 +1,45 @@ | |||||
enlfq | |||||
===== | |||||
A simple NIF lock-free Queue using the library: [moodycamel::concurrentqueue](https://github.com/cameron314/concurrentqueue/tree/8f7e861dd9411a0bf77a6b9de83a47b3424fafba) | |||||
#### moodycamel::ConcurrentQueue | |||||
An industrial-strength lock-free queue for C++. | |||||
**Features**: | |||||
* Knock-your-socks-off blazing fast performance. | |||||
* Single-header implementation. Just drop it in your project. | |||||
* Fully thread-safe lock-free queue. Use concurrently from any number of threads. | |||||
* C++11 implementation -- elements are moved (instead of copied) where possible. | |||||
* Templated, obviating the need to deal exclusively with pointers -- memory is managed for you. | |||||
* No artificial limitations on element types or maximum count. | |||||
* Memory can be allocated once up-front, or dynamically as needed. | |||||
* Fully portable (no assembly; all is done through standard C++11 primitives). | |||||
* Supports super-fast bulk operations. | |||||
* Includes a low-overhead blocking version (BlockingConcurrentQueue). | |||||
* Exception safe. | |||||
Build | |||||
----- | |||||
$ rebar3 compile | |||||
Using | |||||
----- | |||||
```erlang | |||||
{ok, Q} = enlfq:new(). | |||||
T = {any, term, [], #{}, 1}. | |||||
true = enlfq:push(Q,T). | |||||
{ok, T} = enlfq:pop(Q). | |||||
empty = enlfq:pop(Q). | |||||
``` |
@ -0,0 +1,84 @@ | |||||
#include "enlfq.h" | |||||
#include "enlfq_nif.h" | |||||
#include "nif_utils.h" | |||||
#include "concurrentqueue.h" | |||||
struct q_item { | |||||
ErlNifEnv *env; | |||||
ERL_NIF_TERM term; | |||||
}; | |||||
struct squeue { | |||||
moodycamel::ConcurrentQueue<q_item> *queue; | |||||
}; | |||||
void nif_enlfq_free(ErlNifEnv *, void *obj) { | |||||
squeue *inst = static_cast<squeue *>(obj); | |||||
if (inst != nullptr) { | |||||
q_item item; | |||||
while (inst->queue->try_dequeue(item)) { | |||||
enif_free_env(item.env); | |||||
} | |||||
delete inst->queue; | |||||
} | |||||
} | |||||
ERL_NIF_TERM nif_enlfq_new(ErlNifEnv *env, int, const ERL_NIF_TERM *) { | |||||
shared_data *data = static_cast<shared_data *>(enif_priv_data(env)); | |||||
squeue *qinst = static_cast<squeue *>(enif_alloc_resource(data->resQueueInstance, sizeof(squeue))); | |||||
qinst->queue = new moodycamel::ConcurrentQueue<q_item>; | |||||
if (qinst == NULL) | |||||
return make_error(env, "enif_alloc_resource failed"); | |||||
ERL_NIF_TERM term = enif_make_resource(env, qinst); | |||||
enif_release_resource(qinst); | |||||
return enif_make_tuple2(env, ATOMS.atomOk, term); | |||||
} | |||||
ERL_NIF_TERM nif_enlfq_push(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { | |||||
shared_data *data = static_cast<shared_data *>(enif_priv_data(env)); | |||||
squeue *inst; | |||||
if (!enif_get_resource(env, argv[0], data->resQueueInstance, (void **) &inst)) { | |||||
return enif_make_badarg(env); | |||||
} | |||||
q_item item; | |||||
item.env = enif_alloc_env(); | |||||
item.term = enif_make_copy(item.env, argv[1]); | |||||
inst->queue->enqueue(item); | |||||
return ATOMS.atomTrue; | |||||
} | |||||
ERL_NIF_TERM nif_enlfq_pop(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) { | |||||
shared_data *data = static_cast<shared_data *>(enif_priv_data(env)); | |||||
squeue *inst = NULL; | |||||
if (!enif_get_resource(env, argv[0], data->resQueueInstance, (void **) &inst)) { | |||||
return enif_make_badarg(env); | |||||
} | |||||
ERL_NIF_TERM term; | |||||
q_item item; | |||||
if (inst->queue->try_dequeue(item)) { | |||||
term = enif_make_copy(env, item.term); | |||||
enif_free_env(item.env); | |||||
return enif_make_tuple2(env, ATOMS.atomOk, term); | |||||
} else { | |||||
return ATOMS.atomEmpty; | |||||
} | |||||
} |
@ -0,0 +1,10 @@ | |||||
#pragma once | |||||
#include "erl_nif.h" | |||||
extern "C" { | |||||
void nif_enlfq_free(ErlNifEnv *env, void *obj); | |||||
ERL_NIF_TERM nif_enlfq_new(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_enlfq_push(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_enlfq_pop(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); | |||||
} |
@ -0,0 +1,56 @@ | |||||
#include "enlfq_nif.h" | |||||
#include "enlfq.h" | |||||
#include "nif_utils.h" | |||||
const char kAtomOk[] = "ok"; | |||||
const char kAtomError[] = "error"; | |||||
const char kAtomTrue[] = "true"; | |||||
//const char kAtomFalse[] = "false"; | |||||
//const char kAtomUndefined[] = "undefined"; | |||||
const char kAtomEmpty[] = "empty"; | |||||
atoms ATOMS; | |||||
void open_resources(ErlNifEnv *env, shared_data *data) { | |||||
ErlNifResourceFlags flags = static_cast<ErlNifResourceFlags>(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); | |||||
data->resQueueInstance = enif_open_resource_type(env, NULL, "enlfq_instance", nif_enlfq_free, flags, NULL); | |||||
} | |||||
int on_nif_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) { | |||||
ATOMS.atomOk = make_atom(env, kAtomOk); | |||||
ATOMS.atomError = make_atom(env, kAtomError); | |||||
ATOMS.atomTrue = make_atom(env, kAtomTrue); | |||||
// ATOMS.atomFalse = make_atom(env, kAtomFalse); | |||||
// ATOMS.atomUndefined = make_atom(env, kAtomUndefined); | |||||
ATOMS.atomEmpty = make_atom(env, kAtomEmpty); | |||||
shared_data *data = static_cast<shared_data *>(enif_alloc(sizeof(shared_data))); | |||||
open_resources(env, data); | |||||
*priv_data = data; | |||||
return 0; | |||||
} | |||||
void on_nif_unload(ErlNifEnv *, void *priv_data) { | |||||
shared_data *data = static_cast<shared_data *>(priv_data); | |||||
enif_free(data); | |||||
} | |||||
int on_nif_upgrade(ErlNifEnv *env, void **priv, void **, ERL_NIF_TERM) { | |||||
shared_data *data = static_cast<shared_data *>(enif_alloc(sizeof(shared_data))); | |||||
open_resources(env, data); | |||||
*priv = data; | |||||
return 0; | |||||
} | |||||
static ErlNifFunc nif_funcs[] = | |||||
{ | |||||
{"new", 0, nif_enlfq_new}, | |||||
{"push", 2, nif_enlfq_push}, | |||||
{"pop", 1, nif_enlfq_pop} | |||||
}; | |||||
ERL_NIF_INIT(enlfq, nif_funcs, on_nif_load, NULL, on_nif_upgrade, on_nif_unload) | |||||
@ -0,0 +1,19 @@ | |||||
#pragma once | |||||
#include "erl_nif.h" | |||||
struct atoms | |||||
{ | |||||
ERL_NIF_TERM atomOk; | |||||
ERL_NIF_TERM atomError; | |||||
ERL_NIF_TERM atomTrue; | |||||
// ERL_NIF_TERM atomFalse; | |||||
// ERL_NIF_TERM atomUndefined; | |||||
ERL_NIF_TERM atomEmpty; | |||||
}; | |||||
struct shared_data | |||||
{ | |||||
ErlNifResourceType* resQueueInstance; | |||||
}; | |||||
extern atoms ATOMS; |
@ -0,0 +1,27 @@ | |||||
#include "nif_utils.h" | |||||
#include "enlfq_nif.h" | |||||
#include <string.h> | |||||
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name) | |||||
{ | |||||
ERL_NIF_TERM ret; | |||||
if(enif_make_existing_atom(env, name, &ret, ERL_NIF_LATIN1)) | |||||
return ret; | |||||
return enif_make_atom(env, name); | |||||
} | |||||
ERL_NIF_TERM make_binary(ErlNifEnv* env, const char* buff, size_t length) | |||||
{ | |||||
ERL_NIF_TERM term; | |||||
unsigned char *destination_buffer = enif_make_new_binary(env, length, &term); | |||||
memcpy(destination_buffer, buff, length); | |||||
return term; | |||||
} | |||||
ERL_NIF_TERM make_error(ErlNifEnv* env, const char* error) | |||||
{ | |||||
return enif_make_tuple2(env, ATOMS.atomError, make_binary(env, error, strlen(error))); | |||||
} |
@ -0,0 +1,6 @@ | |||||
#pragma once | |||||
#include "erl_nif.h" | |||||
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name); | |||||
ERL_NIF_TERM make_error(ErlNifEnv* env, const char* error); | |||||
ERL_NIF_TERM make_binary(ErlNifEnv* env, const char* buff, size_t length); |
@ -0,0 +1,7 @@ | |||||
{port_specs, [ | |||||
{"../../priv/enlfq.so", ["*.cc"]} | |||||
]}. | |||||
@ -0,0 +1,71 @@ | |||||
-module(benchmark). | |||||
-author("silviu.caragea"). | |||||
-export([ | |||||
benchmark_serial/2, | |||||
benchmark_concurrent/3 | |||||
]). | |||||
benchmark_serial(Elements, MaxPriority) -> | |||||
rand:uniform(), %just to init the seed | |||||
{ok, Q} = enlfq:new(), | |||||
{T0, ok} = timer:tc(fun() -> insert_none(Elements, MaxPriority) end), | |||||
{T1, ok} = timer:tc(fun() -> insert_item(Elements, Q, MaxPriority) end), | |||||
{T2, ok} = timer:tc(fun() -> remove_item(Q) end), | |||||
T0Ms = T0/1000, | |||||
T1Ms = T1/1000, | |||||
T2Ms = T2/1000, | |||||
io:format(<<"insert overhead: ~p ms insert time: ~p ms pop time: ~p ms ~n">>, [T0Ms, T1Ms, T2Ms]). | |||||
benchmark_concurrent(Procs, Elements, MaxPriority) -> | |||||
{ok, Q} = enlfq:new(), | |||||
ElsPerProcess = round(Elements/Procs), | |||||
InsertNoneWorkFun = fun() -> | |||||
insert_none(ElsPerProcess, MaxPriority) | |||||
end, | |||||
InsertWorkFun = fun() -> | |||||
insert_item(ElsPerProcess, Q, MaxPriority) | |||||
end, | |||||
RemoveWorkFun = fun() -> | |||||
remove_item(Q) | |||||
end, | |||||
{T0, _} = timer:tc(fun()-> multi_spawn:do_work(InsertNoneWorkFun, Procs) end), | |||||
{T1, _} = timer:tc(fun()-> multi_spawn:do_work(InsertWorkFun, Procs) end), | |||||
{T2, _} = timer:tc(fun()-> multi_spawn:do_work(RemoveWorkFun, Procs) end), | |||||
T0Ms = T0/1000, | |||||
T1Ms = T1/1000, | |||||
T2Ms = T2/1000, | |||||
io:format(<<"insert overhead: ~p ms insert time: ~p ms pop time: ~p ms ~n">>, [T0Ms, T1Ms, T2Ms]). | |||||
insert_item(0, _Q, _Max) -> | |||||
ok; | |||||
insert_item(N, Q, Max) -> | |||||
%% El = rand:uniform(Max), | |||||
true = enlfq:push(Q,{}), | |||||
insert_item(N-1, Q, Max). | |||||
remove_item(Q) -> | |||||
case enlfq:pop(Q) of | |||||
empty -> | |||||
ok; | |||||
{ok, _} -> | |||||
remove_item(Q) | |||||
end. | |||||
insert_none(0, _Max) -> | |||||
ok; | |||||
insert_none(N, Max) -> | |||||
%% rand:uniform(Max), | |||||
insert_none(N-1, Max). | |||||
@ -0,0 +1,51 @@ | |||||
-module(enlfq). | |||||
-on_load(load_nif/0). | |||||
-define(NOT_LOADED, not_loaded(?LINE)). | |||||
%% API exports | |||||
-export([new/0, push/2, pop/1]). | |||||
%%==================================================================== | |||||
%% API functions | |||||
%%==================================================================== | |||||
-spec(new() -> {ok, QueueRef :: reference()} | badarg | {error, Reason :: binary()}). | |||||
new() -> | |||||
?NOT_LOADED. | |||||
-spec(push(QueueRef :: reference(), Data :: any()) -> | |||||
true | {error, Reason :: binary()}). | |||||
push(_QueueRef, _Data) -> | |||||
?NOT_LOADED. | |||||
-spec(pop(QueueRef :: reference()) -> | |||||
{ok, Data :: any()} | empty | {error, Reason :: binary()}). | |||||
pop(_QueueRef) -> | |||||
?NOT_LOADED. | |||||
%%==================================================================== | |||||
%% Internal functions | |||||
%%==================================================================== | |||||
%% nif functions | |||||
load_nif() -> | |||||
SoName = get_priv_path(?MODULE), | |||||
io:format(<<"Loading library: ~p ~n">>, [SoName]), | |||||
ok = erlang:load_nif(SoName, 0). | |||||
get_priv_path(File) -> | |||||
case code:priv_dir(?MODULE) of | |||||
{error, bad_name} -> | |||||
Ebin = filename:dirname(code:which(?MODULE)), | |||||
filename:join([filename:dirname(Ebin), "priv", File]); | |||||
Dir -> | |||||
filename:join(Dir, File) | |||||
end. | |||||
not_loaded(Line) -> | |||||
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}). |
@ -0,0 +1,23 @@ | |||||
-module(multi_spawn). | |||||
-author("silviu.caragea"). | |||||
-export([do_work/2]). | |||||
do_work(Fun, Count) -> | |||||
process_flag(trap_exit, true), | |||||
spawn_childrens(Fun, Count), | |||||
wait_responses(Count). | |||||
spawn_childrens(_Fun, 0) -> | |||||
ok; | |||||
spawn_childrens(Fun, Count) -> | |||||
spawn_link(Fun), | |||||
spawn_childrens(Fun, Count -1). | |||||
wait_responses(0) -> | |||||
ok; | |||||
wait_responses(Count) -> | |||||
receive | |||||
{'EXIT',_FromPid, _Reason} -> | |||||
wait_responses(Count -1) | |||||
end. |