From c7be84591434fb934244bdc342f0a71912b81840 Mon Sep 17 00:00:00 2001 From: AICells <1713699517@qq.com> Date: Wed, 22 Jan 2020 02:50:31 +0800 Subject: [PATCH] =?UTF-8?q?nif=E7=9B=B8=E5=85=B3=E6=95=B4=E7=90=86?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 7 +- c_src/Makefile | 15 + c_src/cq/Makefile | 73 ++++ c_src/cq/cq_nif.c | 564 +++++++++++++++++++++++++ c_src/cq/cq_nif.h | 71 ++++ c_src/cq1/Makefile | 73 ++++ c_src/cq1/cq_nif.c | 564 +++++++++++++++++++++++++ c_src/cq1/cq_nif.h | 71 ++++ c_src/cq2/Makefile | 73 ++++ c_src/cq2/cq_nif.c | 564 +++++++++++++++++++++++++ c_src/cq2/cq_nif.h | 71 ++++ rebar.config | 10 +- src/docs/erlangNif编程相关.md | 64 +++ src/docs/erlangVS环境编译nifdll.md | 27 ++ 14 files changed, 2243 insertions(+), 4 deletions(-) create mode 100644 c_src/Makefile create mode 100644 c_src/cq/Makefile create mode 100644 c_src/cq/cq_nif.c create mode 100644 c_src/cq/cq_nif.h create mode 100644 c_src/cq1/Makefile create mode 100644 c_src/cq1/cq_nif.c create mode 100644 c_src/cq1/cq_nif.h create mode 100644 c_src/cq2/Makefile create mode 100644 c_src/cq2/cq_nif.c create mode 100644 c_src/cq2/cq_nif.h create mode 100644 src/docs/erlangNif编程相关.md create mode 100644 src/docs/erlangVS环境编译nifdll.md diff --git a/.gitignore b/.gitignore index 479aef7..18df924 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ erl_crash.dump # rebar 2.x .rebar rel/example_project -ebin/*.beam +ebin/* deps # rebar 3 @@ -16,7 +16,10 @@ deps _build/ _checkouts/ rebar.lock +priv # idea .idea -*.iml \ No newline at end of file +*.iml +cmake-build* +CMakeLists.txt \ No newline at end of file diff --git a/c_src/Makefile b/c_src/Makefile new file mode 100644 index 0000000..1f80e4d --- /dev/null +++ b/c_src/Makefile @@ -0,0 +1,15 @@ +AllNifDirs :=$(shell ls -l | grep ^d | awk '{print $$9}') + +CurDir := $(shell pwd) +MdPriv := $(shell mkdir -p $(CurDir)/../priv/) + +ECHO: + @echo $(AllNifDirs) + +all:$(AllNifDirs) + +$(AllNifDirs):ECHO + @make -C $@ + +clean: + @for OneNifDir in $(AllNifDirs); do make -C $$OneNifDir clean; done diff --git a/c_src/cq/Makefile b/c_src/cq/Makefile new file mode 100644 index 0000000..b5844b0 --- /dev/null +++ b/c_src/cq/Makefile @@ -0,0 +1,73 @@ +# Based on c_src.mk from erlang.mk by Loic Hoguin + +CURDIR := $(shell pwd) +BASEDIR := $(abspath $(CURDIR)) + +PROJECT ?= $(notdir $(BASEDIR)) +PROJECT := $(strip $(PROJECT)) + +ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") +ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") +ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") + +C_SRC_DIR = $(CURDIR) +C_SRC_OUTPUT ?= $(CURDIR)/../../priv/$(PROJECT).so + +# System type and C compiler/flags. + +UNAME_SYS := $(shell uname -s) +ifeq ($(UNAME_SYS), Darwin) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall + LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress +else ifeq ($(UNAME_SYS), FreeBSD) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +else ifeq ($(UNAME_SYS), Linux) + CC ?= gcc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +endif + +CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) +CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) + +LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei +LDFLAGS += -shared + +# Verbosity. + +c_verbose_0 = @echo " C " $(?F); +c_verbose = $(c_verbose_$(V)) + +cpp_verbose_0 = @echo " CPP " $(?F); +cpp_verbose = $(cpp_verbose_$(V)) + +link_verbose_0 = @echo " LD " $(@F); +link_verbose = $(link_verbose_$(V)) + +SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) +OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) + +COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c +COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c + +$(C_SRC_OUTPUT): $(OBJECTS) + $(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) + +%.o: %.c + $(COMPILE_C) $(OUTPUT_OPTION) $< + +%.o: %.cc + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.C + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.cpp + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +clean: + @rm -f $(C_SRC_OUTPUT) $(OBJECTS) diff --git a/c_src/cq/cq_nif.c b/c_src/cq/cq_nif.c new file mode 100644 index 0000000..2f26a20 --- /dev/null +++ b/c_src/cq/cq_nif.c @@ -0,0 +1,564 @@ +#include +#include + +#include "erl_nif.h" +#include "cq_nif.h" + + +/* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT +# error Requires dirty schedulers +#endif */ + + + + + +ERL_NIF_TERM +mk_atom(ErlNifEnv* env, const char* atom) +{ + ERL_NIF_TERM ret; + + if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) + return enif_make_atom(env, atom); + + return ret; +} + +ERL_NIF_TERM +mk_error(ErlNifEnv* env, const char* mesg) +{ + return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); +} + + +static ERL_NIF_TERM +queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); + if (q == NULL) + return mk_error(env, "priv_alloc_error"); + + ERL_NIF_TERM ret = enif_make_resource(env, q); + /* enif_release_resource(ret); */ + + uint32_t queue_id = 0; + uint32_t queue_size = 0; + uint32_t overflow_size = 0; + + if (!enif_get_uint(env, argv[0], &queue_id) || + !enif_get_uint(env, argv[1], &queue_size) || + !enif_get_uint(env, argv[2], &overflow_size)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "bad_queue_id"); + + /* TODO: Check that queue_size is power of 2 */ + + if (QUEUES[queue_id] != NULL) + return mk_error(env, "queue_id_already_exists"); + + q->id = queue_id; + q->queue_size = queue_size; + q->overflow_size = overflow_size; + q->tail = 0; + q->head = 0; + q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); + q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); + q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); + q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); + q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); + + q->push_queue = new_queue(); + q->pop_queue = new_queue(); + + /* TODO: Check calloc return */ + + + for (int i = 0; i < q->queue_size; i++) { + ErlNifEnv *slot_env = enif_alloc_env(); + + q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; + //q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); + } + + QUEUES[q->id] = q; + + return enif_make_tuple2(env, mk_atom(env, "ok"), ret); +} + + +static ERL_NIF_TERM +queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + /* TODO: Free all the things! */ + QUEUES[queue_id] = NULL; + + return enif_make_atom(env, "ok"); + +} + +/* Push to the head of the queue. */ +static ERL_NIF_TERM +queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + /* Load the queue */ + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + if (q->id != queue_id) + return mk_error(env, "not_identical_queue_id"); + + + for (int i = 0; i < q->queue_size; i++) { + fprintf(stderr, "queue slot %d, index %d, state %d\n", + i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); + } + + /* If there's consumers waiting, the queue must be empty and we + should directly pick a consumer to notify. */ + + ErlNifPid *waiting_consumer; + int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); + if (dequeue_ret) { + ErlNifEnv *msg_env = enif_alloc_env(); + ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); + ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); + + if (enif_send(env, waiting_consumer, msg_env, tuple)) { + enif_free_env(msg_env); + return mk_atom(env, "ok"); + } else { + return mk_error(env, "notify_failed"); + } + } + + + + /* Increment head and attempt to claim the slot by marking it as + busy. This ensures no other thread will attempt to modify this + slot. If we cannot lock it, another thread must have */ + + uint64_t head = __sync_add_and_fetch(&q->head, 1); + size_t size = q->queue_size; + + while (1) { + uint64_t index = SLOT_INDEX(head, size); + uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], + STATE_EMPTY, + STATE_WRITE); + + switch (ret) { + + case STATE_EMPTY: + head = __sync_add_and_fetch(&q->head, 1); + + case STATE_WRITE: + /* We acquired the write lock, go ahead with the write. */ + break; + + case STATE_FULL: + /* We have caught up with the tail and the buffer is + full. Block the producer until a consumer reads the + item. */ + return mk_error(env, "full_not_implemented"); + } + } + + /* If head catches up with tail, the queue is full. Add to + overflow instead */ + + + /* Copy term to slot-specific temporary process env. */ + ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); + q->slots_terms[SLOT_INDEX(head, size)] = copy; + + __sync_synchronize(); /* Or compiler memory barrier? */ + + + /* TODO: Do we need to collect garbage? */ + + + /* Mark the slot ready to be consumed */ + if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], + STATE_WRITE, + STATE_FULL)) { + return mk_atom(env, "ok"); + } else { + return mk_error(env, "could_not_update_slots_after_insert"); + } + +} + + + +static ERL_NIF_TERM +queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + /* Load queue */ + + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + if (q->id != queue_id) + return mk_error(env, "not_identical_queue_id"); + + uint64_t qsize = q->queue_size; + uint64_t tail = q->tail; + uint64_t num_busy = 0; + + /* Walk the buffer starting the tail position until we are either + able to consume a term or find an empty slot. */ + while (1) { + uint64_t index = SLOT_INDEX(tail, qsize); + uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], + STATE_FULL, + STATE_READ); + + if (ret == STATE_READ) { + /* We were able to mark the term as read in progress. We + now have an exclusive lock. */ + break; + + } else if (ret == STATE_WRITE) { + /* We found an item with a write in progress. If that + thread progresses, it will eventually mark the slot as + full. We can spin until that happens. + + This can take an arbitrary amount of time and multiple + reading threads will compete for the same slot. + + Instead we add the caller to the queue of blocking + consumers. When the next producer comes it will "help" + this thread by calling enif_send on the current + in-progress term *and* handle it's own terms. If + there's no new push to the queue, this will block + forever. */ + return mk_atom(env, "write_in_progress_not_implemented"); + + } else if (ret == STATE_EMPTY) { + /* We found an empty item. Queue must be empty. Add + calling Erlang consumer process to queue of waiting + processes. When the next producer comes along, it first + checks the waiting consumers and calls enif_send + instead of writing to the slots. */ + + ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); + pid = enif_self(env, pid); + enqueue(q->pop_queue, pid); + + return mk_atom(env, "wait_for_msg"); + + } else { + tail = __sync_add_and_fetch(&q->tail, 1); + } + } + + + /* Copy term into calling process env. The NIF env can now be + gargbage collected. */ + ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); + + + /* Mark the slot as free. Note: We don't increment the tail + position here, as another thread also walking the buffer might + have incremented it multiple times */ + q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; + if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], + STATE_READ, + STATE_EMPTY)) { + return enif_make_tuple2(env, mk_atom(env, "ok"), copy); + } else { + return mk_error(env, "could_not_update_slots_after_pop"); + } +} + + +static ERL_NIF_TERM +queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + + ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); + ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); + for (int i = 0; i < q->queue_size; i++) { + slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); + + if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { + slots_terms[i] = mk_atom(env, "null"); + } else { + slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); + } + } + return enif_make_tuple4(env, + enif_make_uint64(env, q->tail), + enif_make_uint64(env, q->head), + enif_make_list_from_array(env, slots_states, q->queue_size), + enif_make_list_from_array(env, slots_terms, q->queue_size)); +} + +static ERL_NIF_TERM +queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + uint64_t pop_queue_size = 0; + cq_node_t *node = q->pop_queue->head; + if (node->value == NULL) { + node = node->next; + node = Q_PTR(node); + } + + while (node != NULL) { + pop_queue_size++; + node = node->next; + node = Q_PTR(node); + } + + ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); + + node = q->pop_queue->head; + node = Q_PTR(node); + if (node->value == NULL) { + node = node->next; + node = Q_PTR(node); + } + + uint64_t i = 0; + while (node != NULL) { + if (node->value == 0) { + pop_queue_pids[i] = mk_atom(env, "null"); + } + else { + pop_queue_pids[i] = enif_make_pid(env, node->value); + } + + i++; + node = node->next; + node = Q_PTR(node); + } + + ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); + enif_free(pop_queue_pids); + + return list; +} + + + +static ERL_NIF_TERM +print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + + uint64_t *p1 = malloc(8); + *p1 = 0; + + + for (int bit = 63; bit >= 0; bit--) { + uint64_t power = 1 << bit; + //uint64_t byte = *p1; + uint64_t byte = p1; + fprintf(stderr, "%d", (byte & power) >> bit); + } + fprintf(stderr, "\n"); + + //enif_free(p1); + + return mk_atom(env, "ok"); +} + +void free_resource(ErlNifEnv* env, void* arg) +{ + //cq_t *cq = (cq_t *) arg; + + fprintf(stderr, "free_resource\n"); +} + + +cq_queue_t * new_queue() +{ + cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); + cq_node_t *node = enif_alloc(sizeof(cq_node_t)); + node->next = NULL; + //node->env = NULL; + node->value = NULL; + queue->head = node; + queue->tail = node; + + return queue; +} + + + +void enqueue(cq_queue_t *queue, ErlNifPid *pid) +{ + cq_node_t *node = enif_alloc(sizeof(cq_node_t)); + //node->env = enif_alloc_env(); + //node->term = enif_make_copy(node->env, term); + node->value = pid; + node->next = NULL; + fprintf(stderr, "node %lu\n", node); + + cq_node_t *tail = NULL; + uint64_t tail_count = 0; + while (1) { + tail = queue->tail; + cq_node_t *tail_ptr = Q_PTR(tail); + tail_count = Q_COUNT(tail); + + cq_node_t *next = tail->next; + cq_node_t *next_ptr = Q_PTR(next); + uint64_t next_count = Q_COUNT(next); + + if (tail == queue->tail) { + fprintf(stderr, "tail == queue->tail\n"); + if (next_ptr == NULL) { + fprintf(stderr, "next_ptr == NULL\n"); + if (__sync_bool_compare_and_swap(&tail_ptr->next, + next, + Q_SET_COUNT(node, next_count+1))) + fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); + break; + } else { + __sync_bool_compare_and_swap(&queue->tail, + tail, + Q_SET_COUNT(next_ptr, next_count+1)); + fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); + } + } + } + + cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); + int ret = __sync_bool_compare_and_swap(&queue->tail, + tail, + node_with_count); + fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); +} + + +int dequeue(cq_queue_t *queue, ErlNifPid **pid) +{ + fprintf(stderr, "dequeue\n"); + cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; + + while (1) { + head = queue->head; + head_ptr = Q_PTR(head); + tail = queue->tail; + tail_ptr = Q_PTR(tail); + next = head->next; + next_ptr = Q_PTR(next); + fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); + + if (head == queue->head) { + if (head_ptr == tail_ptr) { + if (next_ptr == NULL) { + return 0; /* Queue is empty */ + } + fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); + __sync_bool_compare_and_swap(&queue->tail, + tail, + Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); + } else { + fprintf(stderr, "next->value %lu\n", next_ptr->value); + *pid = next_ptr->value; + fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); + if (__sync_bool_compare_and_swap(&queue->head, + head, + Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) + break; + } + } + } + // free pid + //enif_free(Q_PTR(head)); + return 1; +} + + + + +int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { + /* Initialize global array mapping id to cq_t ptr */ + QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); + if (QUEUES == NULL) + return -1; + + + ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); + CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", + &free_resource, flags, NULL); + + if (CQ_RESOURCE == NULL) + return -1; + + return 0; +} + + +static ErlNifFunc nif_funcs[] = { + {"new" , 3, queue_new}, + {"free" , 1, queue_free}, + {"push" , 2, queue_push}, + {"async_pop", 1, queue_async_pop}, + {"debug" , 1, queue_debug}, + {"debug_poppers", 1, queue_debug_poppers}, + {"print_bits", 0, print_bits} +}; + +ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL); diff --git a/c_src/cq/cq_nif.h b/c_src/cq/cq_nif.h new file mode 100644 index 0000000..75f8891 --- /dev/null +++ b/c_src/cq/cq_nif.h @@ -0,0 +1,71 @@ +#include +#include "erl_nif.h" + + +#define CACHE_LINE_SIZE 64 + +#define SLOT_INDEX(__index, __size) __index & (__size - 1) + +#define Q_MASK 3L +#define Q_PTR(__ptr) (cq_node_t *) (((uint64_t)__ptr) & (~Q_MASK)) +#define Q_COUNT(__ptr) ((uint64_t) __ptr & Q_MASK) +#define Q_SET_COUNT(__ptr, __val) (cq_node_t *) ((uint64_t) __ptr | (__val & Q_MASK)) + + +#define STATE_EMPTY 0 +#define STATE_WRITE 1 +#define STATE_READ 2 +#define STATE_FULL 3 + + +ErlNifResourceType* CQ_RESOURCE; + +typedef struct cq_node cq_node_t; + +struct cq_node { + ErlNifEnv *env; + //ERL_NIF_TERM term; + ErlNifPid *value; + cq_node_t *next; +}; + + + +typedef struct cq_queue { + cq_node_t *head; + cq_node_t *tail; +} cq_queue_t; + + +// TODO: Add padding between the fields +typedef struct cq { + uint32_t id; + uint64_t queue_size; + uint64_t overflow_size; + uint64_t head; + uint64_t tail; + + uint8_t *slots_states; + ERL_NIF_TERM *slots_terms; + ErlNifEnv **slots_envs; + + cq_queue_t *push_queue; + cq_queue_t *pop_queue; + + uint8_t *overflow_states; + ERL_NIF_TERM *overflow_terms; + ErlNifEnv **overflow_envs; + +} cq_t; + +cq_t **QUEUES = NULL; /* Initialized on nif load */ + + +ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom); +ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* msg); +int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); +void free_resource(ErlNifEnv*, void*); + + +cq_queue_t* new_queue(void); +void enqueue(cq_queue_t *q, ErlNifPid *pid); diff --git a/c_src/cq1/Makefile b/c_src/cq1/Makefile new file mode 100644 index 0000000..b5844b0 --- /dev/null +++ b/c_src/cq1/Makefile @@ -0,0 +1,73 @@ +# Based on c_src.mk from erlang.mk by Loic Hoguin + +CURDIR := $(shell pwd) +BASEDIR := $(abspath $(CURDIR)) + +PROJECT ?= $(notdir $(BASEDIR)) +PROJECT := $(strip $(PROJECT)) + +ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") +ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") +ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") + +C_SRC_DIR = $(CURDIR) +C_SRC_OUTPUT ?= $(CURDIR)/../../priv/$(PROJECT).so + +# System type and C compiler/flags. + +UNAME_SYS := $(shell uname -s) +ifeq ($(UNAME_SYS), Darwin) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall + LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress +else ifeq ($(UNAME_SYS), FreeBSD) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +else ifeq ($(UNAME_SYS), Linux) + CC ?= gcc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +endif + +CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) +CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) + +LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei +LDFLAGS += -shared + +# Verbosity. + +c_verbose_0 = @echo " C " $(?F); +c_verbose = $(c_verbose_$(V)) + +cpp_verbose_0 = @echo " CPP " $(?F); +cpp_verbose = $(cpp_verbose_$(V)) + +link_verbose_0 = @echo " LD " $(@F); +link_verbose = $(link_verbose_$(V)) + +SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) +OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) + +COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c +COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c + +$(C_SRC_OUTPUT): $(OBJECTS) + $(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) + +%.o: %.c + $(COMPILE_C) $(OUTPUT_OPTION) $< + +%.o: %.cc + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.C + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.cpp + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +clean: + @rm -f $(C_SRC_OUTPUT) $(OBJECTS) diff --git a/c_src/cq1/cq_nif.c b/c_src/cq1/cq_nif.c new file mode 100644 index 0000000..2f26a20 --- /dev/null +++ b/c_src/cq1/cq_nif.c @@ -0,0 +1,564 @@ +#include +#include + +#include "erl_nif.h" +#include "cq_nif.h" + + +/* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT +# error Requires dirty schedulers +#endif */ + + + + + +ERL_NIF_TERM +mk_atom(ErlNifEnv* env, const char* atom) +{ + ERL_NIF_TERM ret; + + if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) + return enif_make_atom(env, atom); + + return ret; +} + +ERL_NIF_TERM +mk_error(ErlNifEnv* env, const char* mesg) +{ + return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); +} + + +static ERL_NIF_TERM +queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); + if (q == NULL) + return mk_error(env, "priv_alloc_error"); + + ERL_NIF_TERM ret = enif_make_resource(env, q); + /* enif_release_resource(ret); */ + + uint32_t queue_id = 0; + uint32_t queue_size = 0; + uint32_t overflow_size = 0; + + if (!enif_get_uint(env, argv[0], &queue_id) || + !enif_get_uint(env, argv[1], &queue_size) || + !enif_get_uint(env, argv[2], &overflow_size)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "bad_queue_id"); + + /* TODO: Check that queue_size is power of 2 */ + + if (QUEUES[queue_id] != NULL) + return mk_error(env, "queue_id_already_exists"); + + q->id = queue_id; + q->queue_size = queue_size; + q->overflow_size = overflow_size; + q->tail = 0; + q->head = 0; + q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); + q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); + q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); + q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); + q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); + + q->push_queue = new_queue(); + q->pop_queue = new_queue(); + + /* TODO: Check calloc return */ + + + for (int i = 0; i < q->queue_size; i++) { + ErlNifEnv *slot_env = enif_alloc_env(); + + q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; + //q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); + } + + QUEUES[q->id] = q; + + return enif_make_tuple2(env, mk_atom(env, "ok"), ret); +} + + +static ERL_NIF_TERM +queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + /* TODO: Free all the things! */ + QUEUES[queue_id] = NULL; + + return enif_make_atom(env, "ok"); + +} + +/* Push to the head of the queue. */ +static ERL_NIF_TERM +queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + /* Load the queue */ + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + if (q->id != queue_id) + return mk_error(env, "not_identical_queue_id"); + + + for (int i = 0; i < q->queue_size; i++) { + fprintf(stderr, "queue slot %d, index %d, state %d\n", + i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); + } + + /* If there's consumers waiting, the queue must be empty and we + should directly pick a consumer to notify. */ + + ErlNifPid *waiting_consumer; + int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); + if (dequeue_ret) { + ErlNifEnv *msg_env = enif_alloc_env(); + ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); + ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); + + if (enif_send(env, waiting_consumer, msg_env, tuple)) { + enif_free_env(msg_env); + return mk_atom(env, "ok"); + } else { + return mk_error(env, "notify_failed"); + } + } + + + + /* Increment head and attempt to claim the slot by marking it as + busy. This ensures no other thread will attempt to modify this + slot. If we cannot lock it, another thread must have */ + + uint64_t head = __sync_add_and_fetch(&q->head, 1); + size_t size = q->queue_size; + + while (1) { + uint64_t index = SLOT_INDEX(head, size); + uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], + STATE_EMPTY, + STATE_WRITE); + + switch (ret) { + + case STATE_EMPTY: + head = __sync_add_and_fetch(&q->head, 1); + + case STATE_WRITE: + /* We acquired the write lock, go ahead with the write. */ + break; + + case STATE_FULL: + /* We have caught up with the tail and the buffer is + full. Block the producer until a consumer reads the + item. */ + return mk_error(env, "full_not_implemented"); + } + } + + /* If head catches up with tail, the queue is full. Add to + overflow instead */ + + + /* Copy term to slot-specific temporary process env. */ + ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); + q->slots_terms[SLOT_INDEX(head, size)] = copy; + + __sync_synchronize(); /* Or compiler memory barrier? */ + + + /* TODO: Do we need to collect garbage? */ + + + /* Mark the slot ready to be consumed */ + if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], + STATE_WRITE, + STATE_FULL)) { + return mk_atom(env, "ok"); + } else { + return mk_error(env, "could_not_update_slots_after_insert"); + } + +} + + + +static ERL_NIF_TERM +queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + /* Load queue */ + + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + if (q->id != queue_id) + return mk_error(env, "not_identical_queue_id"); + + uint64_t qsize = q->queue_size; + uint64_t tail = q->tail; + uint64_t num_busy = 0; + + /* Walk the buffer starting the tail position until we are either + able to consume a term or find an empty slot. */ + while (1) { + uint64_t index = SLOT_INDEX(tail, qsize); + uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], + STATE_FULL, + STATE_READ); + + if (ret == STATE_READ) { + /* We were able to mark the term as read in progress. We + now have an exclusive lock. */ + break; + + } else if (ret == STATE_WRITE) { + /* We found an item with a write in progress. If that + thread progresses, it will eventually mark the slot as + full. We can spin until that happens. + + This can take an arbitrary amount of time and multiple + reading threads will compete for the same slot. + + Instead we add the caller to the queue of blocking + consumers. When the next producer comes it will "help" + this thread by calling enif_send on the current + in-progress term *and* handle it's own terms. If + there's no new push to the queue, this will block + forever. */ + return mk_atom(env, "write_in_progress_not_implemented"); + + } else if (ret == STATE_EMPTY) { + /* We found an empty item. Queue must be empty. Add + calling Erlang consumer process to queue of waiting + processes. When the next producer comes along, it first + checks the waiting consumers and calls enif_send + instead of writing to the slots. */ + + ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); + pid = enif_self(env, pid); + enqueue(q->pop_queue, pid); + + return mk_atom(env, "wait_for_msg"); + + } else { + tail = __sync_add_and_fetch(&q->tail, 1); + } + } + + + /* Copy term into calling process env. The NIF env can now be + gargbage collected. */ + ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); + + + /* Mark the slot as free. Note: We don't increment the tail + position here, as another thread also walking the buffer might + have incremented it multiple times */ + q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; + if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], + STATE_READ, + STATE_EMPTY)) { + return enif_make_tuple2(env, mk_atom(env, "ok"), copy); + } else { + return mk_error(env, "could_not_update_slots_after_pop"); + } +} + + +static ERL_NIF_TERM +queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + + ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); + ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); + for (int i = 0; i < q->queue_size; i++) { + slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); + + if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { + slots_terms[i] = mk_atom(env, "null"); + } else { + slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); + } + } + return enif_make_tuple4(env, + enif_make_uint64(env, q->tail), + enif_make_uint64(env, q->head), + enif_make_list_from_array(env, slots_states, q->queue_size), + enif_make_list_from_array(env, slots_terms, q->queue_size)); +} + +static ERL_NIF_TERM +queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + uint64_t pop_queue_size = 0; + cq_node_t *node = q->pop_queue->head; + if (node->value == NULL) { + node = node->next; + node = Q_PTR(node); + } + + while (node != NULL) { + pop_queue_size++; + node = node->next; + node = Q_PTR(node); + } + + ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); + + node = q->pop_queue->head; + node = Q_PTR(node); + if (node->value == NULL) { + node = node->next; + node = Q_PTR(node); + } + + uint64_t i = 0; + while (node != NULL) { + if (node->value == 0) { + pop_queue_pids[i] = mk_atom(env, "null"); + } + else { + pop_queue_pids[i] = enif_make_pid(env, node->value); + } + + i++; + node = node->next; + node = Q_PTR(node); + } + + ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); + enif_free(pop_queue_pids); + + return list; +} + + + +static ERL_NIF_TERM +print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + + uint64_t *p1 = malloc(8); + *p1 = 0; + + + for (int bit = 63; bit >= 0; bit--) { + uint64_t power = 1 << bit; + //uint64_t byte = *p1; + uint64_t byte = p1; + fprintf(stderr, "%d", (byte & power) >> bit); + } + fprintf(stderr, "\n"); + + //enif_free(p1); + + return mk_atom(env, "ok"); +} + +void free_resource(ErlNifEnv* env, void* arg) +{ + //cq_t *cq = (cq_t *) arg; + + fprintf(stderr, "free_resource\n"); +} + + +cq_queue_t * new_queue() +{ + cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); + cq_node_t *node = enif_alloc(sizeof(cq_node_t)); + node->next = NULL; + //node->env = NULL; + node->value = NULL; + queue->head = node; + queue->tail = node; + + return queue; +} + + + +void enqueue(cq_queue_t *queue, ErlNifPid *pid) +{ + cq_node_t *node = enif_alloc(sizeof(cq_node_t)); + //node->env = enif_alloc_env(); + //node->term = enif_make_copy(node->env, term); + node->value = pid; + node->next = NULL; + fprintf(stderr, "node %lu\n", node); + + cq_node_t *tail = NULL; + uint64_t tail_count = 0; + while (1) { + tail = queue->tail; + cq_node_t *tail_ptr = Q_PTR(tail); + tail_count = Q_COUNT(tail); + + cq_node_t *next = tail->next; + cq_node_t *next_ptr = Q_PTR(next); + uint64_t next_count = Q_COUNT(next); + + if (tail == queue->tail) { + fprintf(stderr, "tail == queue->tail\n"); + if (next_ptr == NULL) { + fprintf(stderr, "next_ptr == NULL\n"); + if (__sync_bool_compare_and_swap(&tail_ptr->next, + next, + Q_SET_COUNT(node, next_count+1))) + fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); + break; + } else { + __sync_bool_compare_and_swap(&queue->tail, + tail, + Q_SET_COUNT(next_ptr, next_count+1)); + fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); + } + } + } + + cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); + int ret = __sync_bool_compare_and_swap(&queue->tail, + tail, + node_with_count); + fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); +} + + +int dequeue(cq_queue_t *queue, ErlNifPid **pid) +{ + fprintf(stderr, "dequeue\n"); + cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; + + while (1) { + head = queue->head; + head_ptr = Q_PTR(head); + tail = queue->tail; + tail_ptr = Q_PTR(tail); + next = head->next; + next_ptr = Q_PTR(next); + fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); + + if (head == queue->head) { + if (head_ptr == tail_ptr) { + if (next_ptr == NULL) { + return 0; /* Queue is empty */ + } + fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); + __sync_bool_compare_and_swap(&queue->tail, + tail, + Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); + } else { + fprintf(stderr, "next->value %lu\n", next_ptr->value); + *pid = next_ptr->value; + fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); + if (__sync_bool_compare_and_swap(&queue->head, + head, + Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) + break; + } + } + } + // free pid + //enif_free(Q_PTR(head)); + return 1; +} + + + + +int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { + /* Initialize global array mapping id to cq_t ptr */ + QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); + if (QUEUES == NULL) + return -1; + + + ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); + CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", + &free_resource, flags, NULL); + + if (CQ_RESOURCE == NULL) + return -1; + + return 0; +} + + +static ErlNifFunc nif_funcs[] = { + {"new" , 3, queue_new}, + {"free" , 1, queue_free}, + {"push" , 2, queue_push}, + {"async_pop", 1, queue_async_pop}, + {"debug" , 1, queue_debug}, + {"debug_poppers", 1, queue_debug_poppers}, + {"print_bits", 0, print_bits} +}; + +ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL); diff --git a/c_src/cq1/cq_nif.h b/c_src/cq1/cq_nif.h new file mode 100644 index 0000000..75f8891 --- /dev/null +++ b/c_src/cq1/cq_nif.h @@ -0,0 +1,71 @@ +#include +#include "erl_nif.h" + + +#define CACHE_LINE_SIZE 64 + +#define SLOT_INDEX(__index, __size) __index & (__size - 1) + +#define Q_MASK 3L +#define Q_PTR(__ptr) (cq_node_t *) (((uint64_t)__ptr) & (~Q_MASK)) +#define Q_COUNT(__ptr) ((uint64_t) __ptr & Q_MASK) +#define Q_SET_COUNT(__ptr, __val) (cq_node_t *) ((uint64_t) __ptr | (__val & Q_MASK)) + + +#define STATE_EMPTY 0 +#define STATE_WRITE 1 +#define STATE_READ 2 +#define STATE_FULL 3 + + +ErlNifResourceType* CQ_RESOURCE; + +typedef struct cq_node cq_node_t; + +struct cq_node { + ErlNifEnv *env; + //ERL_NIF_TERM term; + ErlNifPid *value; + cq_node_t *next; +}; + + + +typedef struct cq_queue { + cq_node_t *head; + cq_node_t *tail; +} cq_queue_t; + + +// TODO: Add padding between the fields +typedef struct cq { + uint32_t id; + uint64_t queue_size; + uint64_t overflow_size; + uint64_t head; + uint64_t tail; + + uint8_t *slots_states; + ERL_NIF_TERM *slots_terms; + ErlNifEnv **slots_envs; + + cq_queue_t *push_queue; + cq_queue_t *pop_queue; + + uint8_t *overflow_states; + ERL_NIF_TERM *overflow_terms; + ErlNifEnv **overflow_envs; + +} cq_t; + +cq_t **QUEUES = NULL; /* Initialized on nif load */ + + +ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom); +ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* msg); +int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); +void free_resource(ErlNifEnv*, void*); + + +cq_queue_t* new_queue(void); +void enqueue(cq_queue_t *q, ErlNifPid *pid); diff --git a/c_src/cq2/Makefile b/c_src/cq2/Makefile new file mode 100644 index 0000000..b5844b0 --- /dev/null +++ b/c_src/cq2/Makefile @@ -0,0 +1,73 @@ +# Based on c_src.mk from erlang.mk by Loic Hoguin + +CURDIR := $(shell pwd) +BASEDIR := $(abspath $(CURDIR)) + +PROJECT ?= $(notdir $(BASEDIR)) +PROJECT := $(strip $(PROJECT)) + +ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") +ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") +ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") + +C_SRC_DIR = $(CURDIR) +C_SRC_OUTPUT ?= $(CURDIR)/../../priv/$(PROJECT).so + +# System type and C compiler/flags. + +UNAME_SYS := $(shell uname -s) +ifeq ($(UNAME_SYS), Darwin) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall + LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress +else ifeq ($(UNAME_SYS), FreeBSD) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +else ifeq ($(UNAME_SYS), Linux) + CC ?= gcc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +endif + +CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) +CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) + +LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei +LDFLAGS += -shared + +# Verbosity. + +c_verbose_0 = @echo " C " $(?F); +c_verbose = $(c_verbose_$(V)) + +cpp_verbose_0 = @echo " CPP " $(?F); +cpp_verbose = $(cpp_verbose_$(V)) + +link_verbose_0 = @echo " LD " $(@F); +link_verbose = $(link_verbose_$(V)) + +SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) +OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) + +COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c +COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c + +$(C_SRC_OUTPUT): $(OBJECTS) + $(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) + +%.o: %.c + $(COMPILE_C) $(OUTPUT_OPTION) $< + +%.o: %.cc + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.C + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.cpp + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +clean: + @rm -f $(C_SRC_OUTPUT) $(OBJECTS) diff --git a/c_src/cq2/cq_nif.c b/c_src/cq2/cq_nif.c new file mode 100644 index 0000000..2f26a20 --- /dev/null +++ b/c_src/cq2/cq_nif.c @@ -0,0 +1,564 @@ +#include +#include + +#include "erl_nif.h" +#include "cq_nif.h" + + +/* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT +# error Requires dirty schedulers +#endif */ + + + + + +ERL_NIF_TERM +mk_atom(ErlNifEnv* env, const char* atom) +{ + ERL_NIF_TERM ret; + + if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) + return enif_make_atom(env, atom); + + return ret; +} + +ERL_NIF_TERM +mk_error(ErlNifEnv* env, const char* mesg) +{ + return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); +} + + +static ERL_NIF_TERM +queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); + if (q == NULL) + return mk_error(env, "priv_alloc_error"); + + ERL_NIF_TERM ret = enif_make_resource(env, q); + /* enif_release_resource(ret); */ + + uint32_t queue_id = 0; + uint32_t queue_size = 0; + uint32_t overflow_size = 0; + + if (!enif_get_uint(env, argv[0], &queue_id) || + !enif_get_uint(env, argv[1], &queue_size) || + !enif_get_uint(env, argv[2], &overflow_size)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "bad_queue_id"); + + /* TODO: Check that queue_size is power of 2 */ + + if (QUEUES[queue_id] != NULL) + return mk_error(env, "queue_id_already_exists"); + + q->id = queue_id; + q->queue_size = queue_size; + q->overflow_size = overflow_size; + q->tail = 0; + q->head = 0; + q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); + q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); + q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); + q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); + q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); + + q->push_queue = new_queue(); + q->pop_queue = new_queue(); + + /* TODO: Check calloc return */ + + + for (int i = 0; i < q->queue_size; i++) { + ErlNifEnv *slot_env = enif_alloc_env(); + + q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; + //q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); + } + + QUEUES[q->id] = q; + + return enif_make_tuple2(env, mk_atom(env, "ok"), ret); +} + + +static ERL_NIF_TERM +queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + /* TODO: Free all the things! */ + QUEUES[queue_id] = NULL; + + return enif_make_atom(env, "ok"); + +} + +/* Push to the head of the queue. */ +static ERL_NIF_TERM +queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + /* Load the queue */ + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + if (q->id != queue_id) + return mk_error(env, "not_identical_queue_id"); + + + for (int i = 0; i < q->queue_size; i++) { + fprintf(stderr, "queue slot %d, index %d, state %d\n", + i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); + } + + /* If there's consumers waiting, the queue must be empty and we + should directly pick a consumer to notify. */ + + ErlNifPid *waiting_consumer; + int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); + if (dequeue_ret) { + ErlNifEnv *msg_env = enif_alloc_env(); + ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); + ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); + + if (enif_send(env, waiting_consumer, msg_env, tuple)) { + enif_free_env(msg_env); + return mk_atom(env, "ok"); + } else { + return mk_error(env, "notify_failed"); + } + } + + + + /* Increment head and attempt to claim the slot by marking it as + busy. This ensures no other thread will attempt to modify this + slot. If we cannot lock it, another thread must have */ + + uint64_t head = __sync_add_and_fetch(&q->head, 1); + size_t size = q->queue_size; + + while (1) { + uint64_t index = SLOT_INDEX(head, size); + uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], + STATE_EMPTY, + STATE_WRITE); + + switch (ret) { + + case STATE_EMPTY: + head = __sync_add_and_fetch(&q->head, 1); + + case STATE_WRITE: + /* We acquired the write lock, go ahead with the write. */ + break; + + case STATE_FULL: + /* We have caught up with the tail and the buffer is + full. Block the producer until a consumer reads the + item. */ + return mk_error(env, "full_not_implemented"); + } + } + + /* If head catches up with tail, the queue is full. Add to + overflow instead */ + + + /* Copy term to slot-specific temporary process env. */ + ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); + q->slots_terms[SLOT_INDEX(head, size)] = copy; + + __sync_synchronize(); /* Or compiler memory barrier? */ + + + /* TODO: Do we need to collect garbage? */ + + + /* Mark the slot ready to be consumed */ + if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], + STATE_WRITE, + STATE_FULL)) { + return mk_atom(env, "ok"); + } else { + return mk_error(env, "could_not_update_slots_after_insert"); + } + +} + + + +static ERL_NIF_TERM +queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + /* Load queue */ + + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + if (q->id != queue_id) + return mk_error(env, "not_identical_queue_id"); + + uint64_t qsize = q->queue_size; + uint64_t tail = q->tail; + uint64_t num_busy = 0; + + /* Walk the buffer starting the tail position until we are either + able to consume a term or find an empty slot. */ + while (1) { + uint64_t index = SLOT_INDEX(tail, qsize); + uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], + STATE_FULL, + STATE_READ); + + if (ret == STATE_READ) { + /* We were able to mark the term as read in progress. We + now have an exclusive lock. */ + break; + + } else if (ret == STATE_WRITE) { + /* We found an item with a write in progress. If that + thread progresses, it will eventually mark the slot as + full. We can spin until that happens. + + This can take an arbitrary amount of time and multiple + reading threads will compete for the same slot. + + Instead we add the caller to the queue of blocking + consumers. When the next producer comes it will "help" + this thread by calling enif_send on the current + in-progress term *and* handle it's own terms. If + there's no new push to the queue, this will block + forever. */ + return mk_atom(env, "write_in_progress_not_implemented"); + + } else if (ret == STATE_EMPTY) { + /* We found an empty item. Queue must be empty. Add + calling Erlang consumer process to queue of waiting + processes. When the next producer comes along, it first + checks the waiting consumers and calls enif_send + instead of writing to the slots. */ + + ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); + pid = enif_self(env, pid); + enqueue(q->pop_queue, pid); + + return mk_atom(env, "wait_for_msg"); + + } else { + tail = __sync_add_and_fetch(&q->tail, 1); + } + } + + + /* Copy term into calling process env. The NIF env can now be + gargbage collected. */ + ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); + + + /* Mark the slot as free. Note: We don't increment the tail + position here, as another thread also walking the buffer might + have incremented it multiple times */ + q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; + if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], + STATE_READ, + STATE_EMPTY)) { + return enif_make_tuple2(env, mk_atom(env, "ok"), copy); + } else { + return mk_error(env, "could_not_update_slots_after_pop"); + } +} + + +static ERL_NIF_TERM +queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + + ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); + ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); + for (int i = 0; i < q->queue_size; i++) { + slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); + + if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { + slots_terms[i] = mk_atom(env, "null"); + } else { + slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); + } + } + return enif_make_tuple4(env, + enif_make_uint64(env, q->tail), + enif_make_uint64(env, q->head), + enif_make_list_from_array(env, slots_states, q->queue_size), + enif_make_list_from_array(env, slots_terms, q->queue_size)); +} + +static ERL_NIF_TERM +queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + uint32_t queue_id = 0; + + if (!enif_get_uint(env, argv[0], &queue_id)) + return mk_error(env, "badarg"); + + if (queue_id > 8) + return mk_error(env, "badarg"); + + cq_t *q = QUEUES[queue_id]; + if (q == NULL) + return mk_error(env, "bad_queue_id"); + + + uint64_t pop_queue_size = 0; + cq_node_t *node = q->pop_queue->head; + if (node->value == NULL) { + node = node->next; + node = Q_PTR(node); + } + + while (node != NULL) { + pop_queue_size++; + node = node->next; + node = Q_PTR(node); + } + + ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); + + node = q->pop_queue->head; + node = Q_PTR(node); + if (node->value == NULL) { + node = node->next; + node = Q_PTR(node); + } + + uint64_t i = 0; + while (node != NULL) { + if (node->value == 0) { + pop_queue_pids[i] = mk_atom(env, "null"); + } + else { + pop_queue_pids[i] = enif_make_pid(env, node->value); + } + + i++; + node = node->next; + node = Q_PTR(node); + } + + ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); + enif_free(pop_queue_pids); + + return list; +} + + + +static ERL_NIF_TERM +print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + + uint64_t *p1 = malloc(8); + *p1 = 0; + + + for (int bit = 63; bit >= 0; bit--) { + uint64_t power = 1 << bit; + //uint64_t byte = *p1; + uint64_t byte = p1; + fprintf(stderr, "%d", (byte & power) >> bit); + } + fprintf(stderr, "\n"); + + //enif_free(p1); + + return mk_atom(env, "ok"); +} + +void free_resource(ErlNifEnv* env, void* arg) +{ + //cq_t *cq = (cq_t *) arg; + + fprintf(stderr, "free_resource\n"); +} + + +cq_queue_t * new_queue() +{ + cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); + cq_node_t *node = enif_alloc(sizeof(cq_node_t)); + node->next = NULL; + //node->env = NULL; + node->value = NULL; + queue->head = node; + queue->tail = node; + + return queue; +} + + + +void enqueue(cq_queue_t *queue, ErlNifPid *pid) +{ + cq_node_t *node = enif_alloc(sizeof(cq_node_t)); + //node->env = enif_alloc_env(); + //node->term = enif_make_copy(node->env, term); + node->value = pid; + node->next = NULL; + fprintf(stderr, "node %lu\n", node); + + cq_node_t *tail = NULL; + uint64_t tail_count = 0; + while (1) { + tail = queue->tail; + cq_node_t *tail_ptr = Q_PTR(tail); + tail_count = Q_COUNT(tail); + + cq_node_t *next = tail->next; + cq_node_t *next_ptr = Q_PTR(next); + uint64_t next_count = Q_COUNT(next); + + if (tail == queue->tail) { + fprintf(stderr, "tail == queue->tail\n"); + if (next_ptr == NULL) { + fprintf(stderr, "next_ptr == NULL\n"); + if (__sync_bool_compare_and_swap(&tail_ptr->next, + next, + Q_SET_COUNT(node, next_count+1))) + fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); + break; + } else { + __sync_bool_compare_and_swap(&queue->tail, + tail, + Q_SET_COUNT(next_ptr, next_count+1)); + fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); + } + } + } + + cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); + int ret = __sync_bool_compare_and_swap(&queue->tail, + tail, + node_with_count); + fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); +} + + +int dequeue(cq_queue_t *queue, ErlNifPid **pid) +{ + fprintf(stderr, "dequeue\n"); + cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; + + while (1) { + head = queue->head; + head_ptr = Q_PTR(head); + tail = queue->tail; + tail_ptr = Q_PTR(tail); + next = head->next; + next_ptr = Q_PTR(next); + fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); + + if (head == queue->head) { + if (head_ptr == tail_ptr) { + if (next_ptr == NULL) { + return 0; /* Queue is empty */ + } + fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); + __sync_bool_compare_and_swap(&queue->tail, + tail, + Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); + } else { + fprintf(stderr, "next->value %lu\n", next_ptr->value); + *pid = next_ptr->value; + fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); + if (__sync_bool_compare_and_swap(&queue->head, + head, + Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) + break; + } + } + } + // free pid + //enif_free(Q_PTR(head)); + return 1; +} + + + + +int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { + /* Initialize global array mapping id to cq_t ptr */ + QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); + if (QUEUES == NULL) + return -1; + + + ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); + CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", + &free_resource, flags, NULL); + + if (CQ_RESOURCE == NULL) + return -1; + + return 0; +} + + +static ErlNifFunc nif_funcs[] = { + {"new" , 3, queue_new}, + {"free" , 1, queue_free}, + {"push" , 2, queue_push}, + {"async_pop", 1, queue_async_pop}, + {"debug" , 1, queue_debug}, + {"debug_poppers", 1, queue_debug_poppers}, + {"print_bits", 0, print_bits} +}; + +ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL); diff --git a/c_src/cq2/cq_nif.h b/c_src/cq2/cq_nif.h new file mode 100644 index 0000000..75f8891 --- /dev/null +++ b/c_src/cq2/cq_nif.h @@ -0,0 +1,71 @@ +#include +#include "erl_nif.h" + + +#define CACHE_LINE_SIZE 64 + +#define SLOT_INDEX(__index, __size) __index & (__size - 1) + +#define Q_MASK 3L +#define Q_PTR(__ptr) (cq_node_t *) (((uint64_t)__ptr) & (~Q_MASK)) +#define Q_COUNT(__ptr) ((uint64_t) __ptr & Q_MASK) +#define Q_SET_COUNT(__ptr, __val) (cq_node_t *) ((uint64_t) __ptr | (__val & Q_MASK)) + + +#define STATE_EMPTY 0 +#define STATE_WRITE 1 +#define STATE_READ 2 +#define STATE_FULL 3 + + +ErlNifResourceType* CQ_RESOURCE; + +typedef struct cq_node cq_node_t; + +struct cq_node { + ErlNifEnv *env; + //ERL_NIF_TERM term; + ErlNifPid *value; + cq_node_t *next; +}; + + + +typedef struct cq_queue { + cq_node_t *head; + cq_node_t *tail; +} cq_queue_t; + + +// TODO: Add padding between the fields +typedef struct cq { + uint32_t id; + uint64_t queue_size; + uint64_t overflow_size; + uint64_t head; + uint64_t tail; + + uint8_t *slots_states; + ERL_NIF_TERM *slots_terms; + ErlNifEnv **slots_envs; + + cq_queue_t *push_queue; + cq_queue_t *pop_queue; + + uint8_t *overflow_states; + ERL_NIF_TERM *overflow_terms; + ErlNifEnv **overflow_envs; + +} cq_t; + +cq_t **QUEUES = NULL; /* Initialized on nif load */ + + +ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom); +ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* msg); +int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); +void free_resource(ErlNifEnv*, void*); + + +cq_queue_t* new_queue(void); +void enqueue(cq_queue_t *q, ErlNifPid *pid); diff --git a/rebar.config b/rebar.config index aaf5abc..4b556b2 100644 --- a/rebar.config +++ b/rebar.config @@ -1,2 +1,8 @@ -{erl_opts, [no_debug_info]}. -{deps, []}. \ No newline at end of file +{erl_opts, [no_debug_info, {i, "include"}]}. +{deps, []}. + +{pre_hooks, + [{"linux", compile, "make -C c_src all"}]}. + +{post_hooks, + [{"linux", clean, "make -C c_src clean"}]}. diff --git a/src/docs/erlangNif编程相关.md b/src/docs/erlangNif编程相关.md new file mode 100644 index 0000000..40ed442 --- /dev/null +++ b/src/docs/erlangNif编程相关.md @@ -0,0 +1,64 @@ +# 描述 + NIF库包含Erlang模块的某些功能的本机实现。像其他任何函数一样,调用本机实现的函数(NIF),与调用方没有任何区别。 + NIF库被构建为动态链接的库文件,并通过调用erlang:load_nif / 2在运行时加载。 + + 警告 + 谨慎使用此功能。 + 执行本机功能作为VM的本机代码的直接扩展。执行不是在安全的环境中进行的。VM 无法提供与执行Erlang代码时相同的服务, + 例如抢先式调度或内存保护。如果本机功能运行不正常,则整个VM都会出现异常。 + 崩溃的本机功能将使整个VM崩溃。 + 错误实现的本机功能可能会导致VM内部状态不一致,从而导致VM崩溃或在调用本机功能后的任何时候VM的其他异常行为。 + 在返回之前进行长时间工作的本机功能会降低VM的响应能力,并可能导致其他奇怪的行为。这种奇怪的行为包括但不限于极端的内存 + 使用情况以及调度程序之间的不良负载平衡。在Erlang / OTP发行版之间,由于冗长的工作而可能发生的奇怪行为也会有所不同。 + +# 简单示例 +``` + /* niftest.c */ + #include + static ERL_NIF_TERM hello(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) + { + return enif_make_string(env, "Hello world!", ERL_NIF_LATIN1); + } + static ErlNifFunc nif_funcs[] = + { + {"hello", 0, hello} + }; + ERL_NIF_INIT(niftest,nif_funcs,NULL,NULL,NULL,NULL) +``` +``` + -module(niftest). + -export([init/0, hello/0]). + -on_load(init/0). + init() -> + erlang:load_nif("./niftest", 0). + + hello() -> + erlang:nif_error("NIF library not loaded"). +``` + 在上面的示例中,使用了on_load指令,该命令功能是在加载模块时自动调用的指定的函数-init/0。 + init函数初始化依次调用erlang:load_nif / 2 , + 该加载器会加载NIF库,并用C中的本机实现替换hello函数。加载后,NIF库将保持不变。在清除它所属的模块代码版本之前,不会将其卸载。 + 如果在成功加载NIF库之前调用了该函数,则每个NIF必须具有用Erlang调用的实现。典型的此类存根实现是调用erlang:nif_error, + 这将引发异常。如果NIF库缺少某些操作系统或硬件体系结构的实现,则Erlang函数也可以用作后备实现。 + + 注意 + NIF不必导出,它可以在模块本地。但是,编译器会优化未使用的本地存根函数,从而导致NIF库的加载失败。 + +# 功能性 + NIF代码和Erlang运行时系统之间的所有交互都是通过调用NIF API函数来执行的。存在以下功能的功能: + 读写Erlang术语 + 任何Erlang术语都可以作为函数参数传递给NIF,并作为函数返回值返回。这些术语属于C类型的 ERL_NIF_TERM,只能使用API​​函数读取或写入。大部分用于读取术语内容的函数都以enif_get_为前缀,并且如果该术语属于预期类型(或非预期类型),则通常返回 true(或false)。编写术语的函数都带有enif_make_前缀 ,通常返回创建的ERL_NIF_TERM。还有一些查询术语的函数,例如enif_is_atom,enif_is_identical和enif_compare。 + + 类型的所有方面ERL_NIF_TERM属于类型的环境ErlNifEnv。术语的生存期由其环境对象的生存期控制。读取或写入术语的所有API函数都将术语所属的环境作为第一个函数参数 + +增加和减少资源的引用计数的次数必须匹配,否则可能引发问题。 +至此,持久资源的主要接口的实现就介绍完了,用户使用时,可以先通过enif_open_resource_type建立资源类型的描述符, +然后利用此描述符,使用enif_alloc_resource分配资源所占用的内存空间,使用enif_make_resource将资源导出到erlang模块层, +在进程间传递资源描述符,资源再传回NIF时,可以通过enif_get_resource取回资源描述符中的资源数据结构, +同时可以通过enif_keep_resource来共享资源,通过enif_release_resource来放弃使用资源,gc系统也会正确回收引用计数为0的资源, +开发者再也不用担心内存没有被正确释放了。 +持久资源为NIF的开发带来了极大的便利,用户可以将一些大规模的数据结构一次传入内存,生成一个资源描述符, +然后在进程间传递资源描述符而不是资源数据本身,减轻每次资源数据拷贝的开销,同时持久资源也是线程安全的, +写erlang程序也可以像写c程序一样高效了。 + + \ No newline at end of file diff --git a/src/docs/erlangVS环境编译nifdll.md b/src/docs/erlangVS环境编译nifdll.md new file mode 100644 index 0000000..b7cd0c3 --- /dev/null +++ b/src/docs/erlangVS环境编译nifdll.md @@ -0,0 +1,27 @@ +# window下编译nif dll 需要安装vs + +## 第一种 makefile配置(可参考jiffy的Makefile) 命令行下编译dll 需要设置vs相关环境变量 +具体要设置的环境变量可参考下面几个 +``` +path 新增 + D:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.24.28314\bin\Hostx64\x64 + +LIB + D:\Windows Kits\10\Lib\10.0.18362.0\ucrt\x64 + D:\Windows Kits\10\Lib\10.0.18362.0\ucrt\x64 + D:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.24.28314\lib\x64 + +INCLUDE + D:\Windows Kits\10\Include\10.0.18362.0\ucrt + D:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.24.28314\include +``` + +## 第二种 在vs单独编译 然后拷贝使用 + VS编译 + 1 新建空项目或者从现有代码创建项目 + 2 先选择 编辑框上边的 解决方案配置 与 解决方案平台 + 3 右键项目属性 设置 配置与第2步 解决方案配置 一样 设置 平台与第二步设置的 解决方案平台 一样 + 4 右键项目属性 配置属性 -> 常规 -> 配置类型 ->动态库(.dll) + 5 右键项目属性 配置属性 -> VC++目录 -> 包含目录 新增 D:\Program Files\erl10.6\erts-10.6\include + 6 右键项目属性 生成 + 注意编译使用的erlang include要合使用的erl版本对应 \ No newline at end of file