@ -0,0 +1,15 @@ | |||||
AllNifDirs :=$(shell ls -l | grep ^d | awk '{print $$9}') | |||||
CurDir := $(shell pwd) | |||||
MdPriv := $(shell mkdir -p $(CurDir)/../priv/) | |||||
ECHO: | |||||
@echo $(AllNifDirs) | |||||
all:$(AllNifDirs) | |||||
$(AllNifDirs):ECHO | |||||
@make -C $@ | |||||
clean: | |||||
@for OneNifDir in $(AllNifDirs); do make -C $$OneNifDir clean; done |
@ -0,0 +1,73 @@ | |||||
# Based on c_src.mk from erlang.mk by Loic Hoguin <essen@ninenines.eu> | |||||
CURDIR := $(shell pwd) | |||||
BASEDIR := $(abspath $(CURDIR)) | |||||
PROJECT ?= $(notdir $(BASEDIR)) | |||||
PROJECT := $(strip $(PROJECT)) | |||||
ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") | |||||
ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") | |||||
ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") | |||||
C_SRC_DIR = $(CURDIR) | |||||
C_SRC_OUTPUT ?= $(CURDIR)/../../priv/$(PROJECT).so | |||||
# System type and C compiler/flags. | |||||
UNAME_SYS := $(shell uname -s) | |||||
ifeq ($(UNAME_SYS), Darwin) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall | |||||
LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress | |||||
else ifeq ($(UNAME_SYS), FreeBSD) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
else ifeq ($(UNAME_SYS), Linux) | |||||
CC ?= gcc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
endif | |||||
CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei | |||||
LDFLAGS += -shared | |||||
# Verbosity. | |||||
c_verbose_0 = @echo " C " $(?F); | |||||
c_verbose = $(c_verbose_$(V)) | |||||
cpp_verbose_0 = @echo " CPP " $(?F); | |||||
cpp_verbose = $(cpp_verbose_$(V)) | |||||
link_verbose_0 = @echo " LD " $(@F); | |||||
link_verbose = $(link_verbose_$(V)) | |||||
SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) | |||||
OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) | |||||
COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c | |||||
COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c | |||||
$(C_SRC_OUTPUT): $(OBJECTS) | |||||
$(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) | |||||
%.o: %.c | |||||
$(COMPILE_C) $(OUTPUT_OPTION) $< | |||||
%.o: %.cc | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.C | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.cpp | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
clean: | |||||
@rm -f $(C_SRC_OUTPUT) $(OBJECTS) |
@ -0,0 +1,564 @@ | |||||
#include <stdio.h> | |||||
#include <unistd.h> | |||||
#include "erl_nif.h" | |||||
#include "cq_nif.h" | |||||
/* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT | |||||
# error Requires dirty schedulers | |||||
#endif */ | |||||
ERL_NIF_TERM | |||||
mk_atom(ErlNifEnv* env, const char* atom) | |||||
{ | |||||
ERL_NIF_TERM ret; | |||||
if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) | |||||
return enif_make_atom(env, atom); | |||||
return ret; | |||||
} | |||||
ERL_NIF_TERM | |||||
mk_error(ErlNifEnv* env, const char* mesg) | |||||
{ | |||||
return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); | |||||
if (q == NULL) | |||||
return mk_error(env, "priv_alloc_error"); | |||||
ERL_NIF_TERM ret = enif_make_resource(env, q); | |||||
/* enif_release_resource(ret); */ | |||||
uint32_t queue_id = 0; | |||||
uint32_t queue_size = 0; | |||||
uint32_t overflow_size = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id) || | |||||
!enif_get_uint(env, argv[1], &queue_size) || | |||||
!enif_get_uint(env, argv[2], &overflow_size)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "bad_queue_id"); | |||||
/* TODO: Check that queue_size is power of 2 */ | |||||
if (QUEUES[queue_id] != NULL) | |||||
return mk_error(env, "queue_id_already_exists"); | |||||
q->id = queue_id; | |||||
q->queue_size = queue_size; | |||||
q->overflow_size = overflow_size; | |||||
q->tail = 0; | |||||
q->head = 0; | |||||
q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); | |||||
q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->push_queue = new_queue(); | |||||
q->pop_queue = new_queue(); | |||||
/* TODO: Check calloc return */ | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
ErlNifEnv *slot_env = enif_alloc_env(); | |||||
q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; | |||||
//q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); | |||||
} | |||||
QUEUES[q->id] = q; | |||||
return enif_make_tuple2(env, mk_atom(env, "ok"), ret); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
/* TODO: Free all the things! */ | |||||
QUEUES[queue_id] = NULL; | |||||
return enif_make_atom(env, "ok"); | |||||
} | |||||
/* Push to the head of the queue. */ | |||||
static ERL_NIF_TERM | |||||
queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
/* Load the queue */ | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
if (q->id != queue_id) | |||||
return mk_error(env, "not_identical_queue_id"); | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
fprintf(stderr, "queue slot %d, index %d, state %d\n", | |||||
i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); | |||||
} | |||||
/* If there's consumers waiting, the queue must be empty and we | |||||
should directly pick a consumer to notify. */ | |||||
ErlNifPid *waiting_consumer; | |||||
int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); | |||||
if (dequeue_ret) { | |||||
ErlNifEnv *msg_env = enif_alloc_env(); | |||||
ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); | |||||
ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); | |||||
if (enif_send(env, waiting_consumer, msg_env, tuple)) { | |||||
enif_free_env(msg_env); | |||||
return mk_atom(env, "ok"); | |||||
} else { | |||||
return mk_error(env, "notify_failed"); | |||||
} | |||||
} | |||||
/* Increment head and attempt to claim the slot by marking it as | |||||
busy. This ensures no other thread will attempt to modify this | |||||
slot. If we cannot lock it, another thread must have */ | |||||
uint64_t head = __sync_add_and_fetch(&q->head, 1); | |||||
size_t size = q->queue_size; | |||||
while (1) { | |||||
uint64_t index = SLOT_INDEX(head, size); | |||||
uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], | |||||
STATE_EMPTY, | |||||
STATE_WRITE); | |||||
switch (ret) { | |||||
case STATE_EMPTY: | |||||
head = __sync_add_and_fetch(&q->head, 1); | |||||
case STATE_WRITE: | |||||
/* We acquired the write lock, go ahead with the write. */ | |||||
break; | |||||
case STATE_FULL: | |||||
/* We have caught up with the tail and the buffer is | |||||
full. Block the producer until a consumer reads the | |||||
item. */ | |||||
return mk_error(env, "full_not_implemented"); | |||||
} | |||||
} | |||||
/* If head catches up with tail, the queue is full. Add to | |||||
overflow instead */ | |||||
/* Copy term to slot-specific temporary process env. */ | |||||
ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); | |||||
q->slots_terms[SLOT_INDEX(head, size)] = copy; | |||||
__sync_synchronize(); /* Or compiler memory barrier? */ | |||||
/* TODO: Do we need to collect garbage? */ | |||||
/* Mark the slot ready to be consumed */ | |||||
if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], | |||||
STATE_WRITE, | |||||
STATE_FULL)) { | |||||
return mk_atom(env, "ok"); | |||||
} else { | |||||
return mk_error(env, "could_not_update_slots_after_insert"); | |||||
} | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
/* Load queue */ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
if (q->id != queue_id) | |||||
return mk_error(env, "not_identical_queue_id"); | |||||
uint64_t qsize = q->queue_size; | |||||
uint64_t tail = q->tail; | |||||
uint64_t num_busy = 0; | |||||
/* Walk the buffer starting the tail position until we are either | |||||
able to consume a term or find an empty slot. */ | |||||
while (1) { | |||||
uint64_t index = SLOT_INDEX(tail, qsize); | |||||
uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], | |||||
STATE_FULL, | |||||
STATE_READ); | |||||
if (ret == STATE_READ) { | |||||
/* We were able to mark the term as read in progress. We | |||||
now have an exclusive lock. */ | |||||
break; | |||||
} else if (ret == STATE_WRITE) { | |||||
/* We found an item with a write in progress. If that | |||||
thread progresses, it will eventually mark the slot as | |||||
full. We can spin until that happens. | |||||
This can take an arbitrary amount of time and multiple | |||||
reading threads will compete for the same slot. | |||||
Instead we add the caller to the queue of blocking | |||||
consumers. When the next producer comes it will "help" | |||||
this thread by calling enif_send on the current | |||||
in-progress term *and* handle it's own terms. If | |||||
there's no new push to the queue, this will block | |||||
forever. */ | |||||
return mk_atom(env, "write_in_progress_not_implemented"); | |||||
} else if (ret == STATE_EMPTY) { | |||||
/* We found an empty item. Queue must be empty. Add | |||||
calling Erlang consumer process to queue of waiting | |||||
processes. When the next producer comes along, it first | |||||
checks the waiting consumers and calls enif_send | |||||
instead of writing to the slots. */ | |||||
ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); | |||||
pid = enif_self(env, pid); | |||||
enqueue(q->pop_queue, pid); | |||||
return mk_atom(env, "wait_for_msg"); | |||||
} else { | |||||
tail = __sync_add_and_fetch(&q->tail, 1); | |||||
} | |||||
} | |||||
/* Copy term into calling process env. The NIF env can now be | |||||
gargbage collected. */ | |||||
ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); | |||||
/* Mark the slot as free. Note: We don't increment the tail | |||||
position here, as another thread also walking the buffer might | |||||
have incremented it multiple times */ | |||||
q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; | |||||
if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], | |||||
STATE_READ, | |||||
STATE_EMPTY)) { | |||||
return enif_make_tuple2(env, mk_atom(env, "ok"), copy); | |||||
} else { | |||||
return mk_error(env, "could_not_update_slots_after_pop"); | |||||
} | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); | |||||
ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); | |||||
if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { | |||||
slots_terms[i] = mk_atom(env, "null"); | |||||
} else { | |||||
slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); | |||||
} | |||||
} | |||||
return enif_make_tuple4(env, | |||||
enif_make_uint64(env, q->tail), | |||||
enif_make_uint64(env, q->head), | |||||
enif_make_list_from_array(env, slots_states, q->queue_size), | |||||
enif_make_list_from_array(env, slots_terms, q->queue_size)); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
uint64_t pop_queue_size = 0; | |||||
cq_node_t *node = q->pop_queue->head; | |||||
if (node->value == NULL) { | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
while (node != NULL) { | |||||
pop_queue_size++; | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); | |||||
node = q->pop_queue->head; | |||||
node = Q_PTR(node); | |||||
if (node->value == NULL) { | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
uint64_t i = 0; | |||||
while (node != NULL) { | |||||
if (node->value == 0) { | |||||
pop_queue_pids[i] = mk_atom(env, "null"); | |||||
} | |||||
else { | |||||
pop_queue_pids[i] = enif_make_pid(env, node->value); | |||||
} | |||||
i++; | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); | |||||
enif_free(pop_queue_pids); | |||||
return list; | |||||
} | |||||
static ERL_NIF_TERM | |||||
print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint64_t *p1 = malloc(8); | |||||
*p1 = 0; | |||||
for (int bit = 63; bit >= 0; bit--) { | |||||
uint64_t power = 1 << bit; | |||||
//uint64_t byte = *p1; | |||||
uint64_t byte = p1; | |||||
fprintf(stderr, "%d", (byte & power) >> bit); | |||||
} | |||||
fprintf(stderr, "\n"); | |||||
//enif_free(p1); | |||||
return mk_atom(env, "ok"); | |||||
} | |||||
void free_resource(ErlNifEnv* env, void* arg) | |||||
{ | |||||
//cq_t *cq = (cq_t *) arg; | |||||
fprintf(stderr, "free_resource\n"); | |||||
} | |||||
cq_queue_t * new_queue() | |||||
{ | |||||
cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); | |||||
cq_node_t *node = enif_alloc(sizeof(cq_node_t)); | |||||
node->next = NULL; | |||||
//node->env = NULL; | |||||
node->value = NULL; | |||||
queue->head = node; | |||||
queue->tail = node; | |||||
return queue; | |||||
} | |||||
void enqueue(cq_queue_t *queue, ErlNifPid *pid) | |||||
{ | |||||
cq_node_t *node = enif_alloc(sizeof(cq_node_t)); | |||||
//node->env = enif_alloc_env(); | |||||
//node->term = enif_make_copy(node->env, term); | |||||
node->value = pid; | |||||
node->next = NULL; | |||||
fprintf(stderr, "node %lu\n", node); | |||||
cq_node_t *tail = NULL; | |||||
uint64_t tail_count = 0; | |||||
while (1) { | |||||
tail = queue->tail; | |||||
cq_node_t *tail_ptr = Q_PTR(tail); | |||||
tail_count = Q_COUNT(tail); | |||||
cq_node_t *next = tail->next; | |||||
cq_node_t *next_ptr = Q_PTR(next); | |||||
uint64_t next_count = Q_COUNT(next); | |||||
if (tail == queue->tail) { | |||||
fprintf(stderr, "tail == queue->tail\n"); | |||||
if (next_ptr == NULL) { | |||||
fprintf(stderr, "next_ptr == NULL\n"); | |||||
if (__sync_bool_compare_and_swap(&tail_ptr->next, | |||||
next, | |||||
Q_SET_COUNT(node, next_count+1))) | |||||
fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); | |||||
break; | |||||
} else { | |||||
__sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
Q_SET_COUNT(next_ptr, next_count+1)); | |||||
fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); | |||||
} | |||||
} | |||||
} | |||||
cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); | |||||
int ret = __sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
node_with_count); | |||||
fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); | |||||
} | |||||
int dequeue(cq_queue_t *queue, ErlNifPid **pid) | |||||
{ | |||||
fprintf(stderr, "dequeue\n"); | |||||
cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; | |||||
while (1) { | |||||
head = queue->head; | |||||
head_ptr = Q_PTR(head); | |||||
tail = queue->tail; | |||||
tail_ptr = Q_PTR(tail); | |||||
next = head->next; | |||||
next_ptr = Q_PTR(next); | |||||
fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); | |||||
if (head == queue->head) { | |||||
if (head_ptr == tail_ptr) { | |||||
if (next_ptr == NULL) { | |||||
return 0; /* Queue is empty */ | |||||
} | |||||
fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); | |||||
__sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); | |||||
} else { | |||||
fprintf(stderr, "next->value %lu\n", next_ptr->value); | |||||
*pid = next_ptr->value; | |||||
fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); | |||||
if (__sync_bool_compare_and_swap(&queue->head, | |||||
head, | |||||
Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
// free pid | |||||
//enif_free(Q_PTR(head)); | |||||
return 1; | |||||
} | |||||
int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { | |||||
/* Initialize global array mapping id to cq_t ptr */ | |||||
QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); | |||||
if (QUEUES == NULL) | |||||
return -1; | |||||
ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); | |||||
CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", | |||||
&free_resource, flags, NULL); | |||||
if (CQ_RESOURCE == NULL) | |||||
return -1; | |||||
return 0; | |||||
} | |||||
static ErlNifFunc nif_funcs[] = { | |||||
{"new" , 3, queue_new}, | |||||
{"free" , 1, queue_free}, | |||||
{"push" , 2, queue_push}, | |||||
{"async_pop", 1, queue_async_pop}, | |||||
{"debug" , 1, queue_debug}, | |||||
{"debug_poppers", 1, queue_debug_poppers}, | |||||
{"print_bits", 0, print_bits} | |||||
}; | |||||
ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL); |
@ -0,0 +1,71 @@ | |||||
#include <stdint.h> | |||||
#include "erl_nif.h" | |||||
#define CACHE_LINE_SIZE 64 | |||||
#define SLOT_INDEX(__index, __size) __index & (__size - 1) | |||||
#define Q_MASK 3L | |||||
#define Q_PTR(__ptr) (cq_node_t *) (((uint64_t)__ptr) & (~Q_MASK)) | |||||
#define Q_COUNT(__ptr) ((uint64_t) __ptr & Q_MASK) | |||||
#define Q_SET_COUNT(__ptr, __val) (cq_node_t *) ((uint64_t) __ptr | (__val & Q_MASK)) | |||||
#define STATE_EMPTY 0 | |||||
#define STATE_WRITE 1 | |||||
#define STATE_READ 2 | |||||
#define STATE_FULL 3 | |||||
ErlNifResourceType* CQ_RESOURCE; | |||||
typedef struct cq_node cq_node_t; | |||||
struct cq_node { | |||||
ErlNifEnv *env; | |||||
//ERL_NIF_TERM term; | |||||
ErlNifPid *value; | |||||
cq_node_t *next; | |||||
}; | |||||
typedef struct cq_queue { | |||||
cq_node_t *head; | |||||
cq_node_t *tail; | |||||
} cq_queue_t; | |||||
// TODO: Add padding between the fields | |||||
typedef struct cq { | |||||
uint32_t id; | |||||
uint64_t queue_size; | |||||
uint64_t overflow_size; | |||||
uint64_t head; | |||||
uint64_t tail; | |||||
uint8_t *slots_states; | |||||
ERL_NIF_TERM *slots_terms; | |||||
ErlNifEnv **slots_envs; | |||||
cq_queue_t *push_queue; | |||||
cq_queue_t *pop_queue; | |||||
uint8_t *overflow_states; | |||||
ERL_NIF_TERM *overflow_terms; | |||||
ErlNifEnv **overflow_envs; | |||||
} cq_t; | |||||
cq_t **QUEUES = NULL; /* Initialized on nif load */ | |||||
ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom); | |||||
ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* msg); | |||||
int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); | |||||
void free_resource(ErlNifEnv*, void*); | |||||
cq_queue_t* new_queue(void); | |||||
void enqueue(cq_queue_t *q, ErlNifPid *pid); |
@ -0,0 +1,73 @@ | |||||
# Based on c_src.mk from erlang.mk by Loic Hoguin <essen@ninenines.eu> | |||||
CURDIR := $(shell pwd) | |||||
BASEDIR := $(abspath $(CURDIR)) | |||||
PROJECT ?= $(notdir $(BASEDIR)) | |||||
PROJECT := $(strip $(PROJECT)) | |||||
ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") | |||||
ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") | |||||
ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") | |||||
C_SRC_DIR = $(CURDIR) | |||||
C_SRC_OUTPUT ?= $(CURDIR)/../../priv/$(PROJECT).so | |||||
# System type and C compiler/flags. | |||||
UNAME_SYS := $(shell uname -s) | |||||
ifeq ($(UNAME_SYS), Darwin) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall | |||||
LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress | |||||
else ifeq ($(UNAME_SYS), FreeBSD) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
else ifeq ($(UNAME_SYS), Linux) | |||||
CC ?= gcc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
endif | |||||
CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei | |||||
LDFLAGS += -shared | |||||
# Verbosity. | |||||
c_verbose_0 = @echo " C " $(?F); | |||||
c_verbose = $(c_verbose_$(V)) | |||||
cpp_verbose_0 = @echo " CPP " $(?F); | |||||
cpp_verbose = $(cpp_verbose_$(V)) | |||||
link_verbose_0 = @echo " LD " $(@F); | |||||
link_verbose = $(link_verbose_$(V)) | |||||
SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) | |||||
OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) | |||||
COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c | |||||
COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c | |||||
$(C_SRC_OUTPUT): $(OBJECTS) | |||||
$(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) | |||||
%.o: %.c | |||||
$(COMPILE_C) $(OUTPUT_OPTION) $< | |||||
%.o: %.cc | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.C | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.cpp | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
clean: | |||||
@rm -f $(C_SRC_OUTPUT) $(OBJECTS) |
@ -0,0 +1,564 @@ | |||||
#include <stdio.h> | |||||
#include <unistd.h> | |||||
#include "erl_nif.h" | |||||
#include "cq_nif.h" | |||||
/* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT | |||||
# error Requires dirty schedulers | |||||
#endif */ | |||||
ERL_NIF_TERM | |||||
mk_atom(ErlNifEnv* env, const char* atom) | |||||
{ | |||||
ERL_NIF_TERM ret; | |||||
if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) | |||||
return enif_make_atom(env, atom); | |||||
return ret; | |||||
} | |||||
ERL_NIF_TERM | |||||
mk_error(ErlNifEnv* env, const char* mesg) | |||||
{ | |||||
return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); | |||||
if (q == NULL) | |||||
return mk_error(env, "priv_alloc_error"); | |||||
ERL_NIF_TERM ret = enif_make_resource(env, q); | |||||
/* enif_release_resource(ret); */ | |||||
uint32_t queue_id = 0; | |||||
uint32_t queue_size = 0; | |||||
uint32_t overflow_size = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id) || | |||||
!enif_get_uint(env, argv[1], &queue_size) || | |||||
!enif_get_uint(env, argv[2], &overflow_size)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "bad_queue_id"); | |||||
/* TODO: Check that queue_size is power of 2 */ | |||||
if (QUEUES[queue_id] != NULL) | |||||
return mk_error(env, "queue_id_already_exists"); | |||||
q->id = queue_id; | |||||
q->queue_size = queue_size; | |||||
q->overflow_size = overflow_size; | |||||
q->tail = 0; | |||||
q->head = 0; | |||||
q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); | |||||
q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->push_queue = new_queue(); | |||||
q->pop_queue = new_queue(); | |||||
/* TODO: Check calloc return */ | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
ErlNifEnv *slot_env = enif_alloc_env(); | |||||
q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; | |||||
//q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); | |||||
} | |||||
QUEUES[q->id] = q; | |||||
return enif_make_tuple2(env, mk_atom(env, "ok"), ret); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
/* TODO: Free all the things! */ | |||||
QUEUES[queue_id] = NULL; | |||||
return enif_make_atom(env, "ok"); | |||||
} | |||||
/* Push to the head of the queue. */ | |||||
static ERL_NIF_TERM | |||||
queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
/* Load the queue */ | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
if (q->id != queue_id) | |||||
return mk_error(env, "not_identical_queue_id"); | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
fprintf(stderr, "queue slot %d, index %d, state %d\n", | |||||
i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); | |||||
} | |||||
/* If there's consumers waiting, the queue must be empty and we | |||||
should directly pick a consumer to notify. */ | |||||
ErlNifPid *waiting_consumer; | |||||
int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); | |||||
if (dequeue_ret) { | |||||
ErlNifEnv *msg_env = enif_alloc_env(); | |||||
ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); | |||||
ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); | |||||
if (enif_send(env, waiting_consumer, msg_env, tuple)) { | |||||
enif_free_env(msg_env); | |||||
return mk_atom(env, "ok"); | |||||
} else { | |||||
return mk_error(env, "notify_failed"); | |||||
} | |||||
} | |||||
/* Increment head and attempt to claim the slot by marking it as | |||||
busy. This ensures no other thread will attempt to modify this | |||||
slot. If we cannot lock it, another thread must have */ | |||||
uint64_t head = __sync_add_and_fetch(&q->head, 1); | |||||
size_t size = q->queue_size; | |||||
while (1) { | |||||
uint64_t index = SLOT_INDEX(head, size); | |||||
uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], | |||||
STATE_EMPTY, | |||||
STATE_WRITE); | |||||
switch (ret) { | |||||
case STATE_EMPTY: | |||||
head = __sync_add_and_fetch(&q->head, 1); | |||||
case STATE_WRITE: | |||||
/* We acquired the write lock, go ahead with the write. */ | |||||
break; | |||||
case STATE_FULL: | |||||
/* We have caught up with the tail and the buffer is | |||||
full. Block the producer until a consumer reads the | |||||
item. */ | |||||
return mk_error(env, "full_not_implemented"); | |||||
} | |||||
} | |||||
/* If head catches up with tail, the queue is full. Add to | |||||
overflow instead */ | |||||
/* Copy term to slot-specific temporary process env. */ | |||||
ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); | |||||
q->slots_terms[SLOT_INDEX(head, size)] = copy; | |||||
__sync_synchronize(); /* Or compiler memory barrier? */ | |||||
/* TODO: Do we need to collect garbage? */ | |||||
/* Mark the slot ready to be consumed */ | |||||
if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], | |||||
STATE_WRITE, | |||||
STATE_FULL)) { | |||||
return mk_atom(env, "ok"); | |||||
} else { | |||||
return mk_error(env, "could_not_update_slots_after_insert"); | |||||
} | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
/* Load queue */ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
if (q->id != queue_id) | |||||
return mk_error(env, "not_identical_queue_id"); | |||||
uint64_t qsize = q->queue_size; | |||||
uint64_t tail = q->tail; | |||||
uint64_t num_busy = 0; | |||||
/* Walk the buffer starting the tail position until we are either | |||||
able to consume a term or find an empty slot. */ | |||||
while (1) { | |||||
uint64_t index = SLOT_INDEX(tail, qsize); | |||||
uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], | |||||
STATE_FULL, | |||||
STATE_READ); | |||||
if (ret == STATE_READ) { | |||||
/* We were able to mark the term as read in progress. We | |||||
now have an exclusive lock. */ | |||||
break; | |||||
} else if (ret == STATE_WRITE) { | |||||
/* We found an item with a write in progress. If that | |||||
thread progresses, it will eventually mark the slot as | |||||
full. We can spin until that happens. | |||||
This can take an arbitrary amount of time and multiple | |||||
reading threads will compete for the same slot. | |||||
Instead we add the caller to the queue of blocking | |||||
consumers. When the next producer comes it will "help" | |||||
this thread by calling enif_send on the current | |||||
in-progress term *and* handle it's own terms. If | |||||
there's no new push to the queue, this will block | |||||
forever. */ | |||||
return mk_atom(env, "write_in_progress_not_implemented"); | |||||
} else if (ret == STATE_EMPTY) { | |||||
/* We found an empty item. Queue must be empty. Add | |||||
calling Erlang consumer process to queue of waiting | |||||
processes. When the next producer comes along, it first | |||||
checks the waiting consumers and calls enif_send | |||||
instead of writing to the slots. */ | |||||
ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); | |||||
pid = enif_self(env, pid); | |||||
enqueue(q->pop_queue, pid); | |||||
return mk_atom(env, "wait_for_msg"); | |||||
} else { | |||||
tail = __sync_add_and_fetch(&q->tail, 1); | |||||
} | |||||
} | |||||
/* Copy term into calling process env. The NIF env can now be | |||||
gargbage collected. */ | |||||
ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); | |||||
/* Mark the slot as free. Note: We don't increment the tail | |||||
position here, as another thread also walking the buffer might | |||||
have incremented it multiple times */ | |||||
q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; | |||||
if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], | |||||
STATE_READ, | |||||
STATE_EMPTY)) { | |||||
return enif_make_tuple2(env, mk_atom(env, "ok"), copy); | |||||
} else { | |||||
return mk_error(env, "could_not_update_slots_after_pop"); | |||||
} | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); | |||||
ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); | |||||
if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { | |||||
slots_terms[i] = mk_atom(env, "null"); | |||||
} else { | |||||
slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); | |||||
} | |||||
} | |||||
return enif_make_tuple4(env, | |||||
enif_make_uint64(env, q->tail), | |||||
enif_make_uint64(env, q->head), | |||||
enif_make_list_from_array(env, slots_states, q->queue_size), | |||||
enif_make_list_from_array(env, slots_terms, q->queue_size)); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
uint64_t pop_queue_size = 0; | |||||
cq_node_t *node = q->pop_queue->head; | |||||
if (node->value == NULL) { | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
while (node != NULL) { | |||||
pop_queue_size++; | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); | |||||
node = q->pop_queue->head; | |||||
node = Q_PTR(node); | |||||
if (node->value == NULL) { | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
uint64_t i = 0; | |||||
while (node != NULL) { | |||||
if (node->value == 0) { | |||||
pop_queue_pids[i] = mk_atom(env, "null"); | |||||
} | |||||
else { | |||||
pop_queue_pids[i] = enif_make_pid(env, node->value); | |||||
} | |||||
i++; | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); | |||||
enif_free(pop_queue_pids); | |||||
return list; | |||||
} | |||||
static ERL_NIF_TERM | |||||
print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint64_t *p1 = malloc(8); | |||||
*p1 = 0; | |||||
for (int bit = 63; bit >= 0; bit--) { | |||||
uint64_t power = 1 << bit; | |||||
//uint64_t byte = *p1; | |||||
uint64_t byte = p1; | |||||
fprintf(stderr, "%d", (byte & power) >> bit); | |||||
} | |||||
fprintf(stderr, "\n"); | |||||
//enif_free(p1); | |||||
return mk_atom(env, "ok"); | |||||
} | |||||
void free_resource(ErlNifEnv* env, void* arg) | |||||
{ | |||||
//cq_t *cq = (cq_t *) arg; | |||||
fprintf(stderr, "free_resource\n"); | |||||
} | |||||
cq_queue_t * new_queue() | |||||
{ | |||||
cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); | |||||
cq_node_t *node = enif_alloc(sizeof(cq_node_t)); | |||||
node->next = NULL; | |||||
//node->env = NULL; | |||||
node->value = NULL; | |||||
queue->head = node; | |||||
queue->tail = node; | |||||
return queue; | |||||
} | |||||
void enqueue(cq_queue_t *queue, ErlNifPid *pid) | |||||
{ | |||||
cq_node_t *node = enif_alloc(sizeof(cq_node_t)); | |||||
//node->env = enif_alloc_env(); | |||||
//node->term = enif_make_copy(node->env, term); | |||||
node->value = pid; | |||||
node->next = NULL; | |||||
fprintf(stderr, "node %lu\n", node); | |||||
cq_node_t *tail = NULL; | |||||
uint64_t tail_count = 0; | |||||
while (1) { | |||||
tail = queue->tail; | |||||
cq_node_t *tail_ptr = Q_PTR(tail); | |||||
tail_count = Q_COUNT(tail); | |||||
cq_node_t *next = tail->next; | |||||
cq_node_t *next_ptr = Q_PTR(next); | |||||
uint64_t next_count = Q_COUNT(next); | |||||
if (tail == queue->tail) { | |||||
fprintf(stderr, "tail == queue->tail\n"); | |||||
if (next_ptr == NULL) { | |||||
fprintf(stderr, "next_ptr == NULL\n"); | |||||
if (__sync_bool_compare_and_swap(&tail_ptr->next, | |||||
next, | |||||
Q_SET_COUNT(node, next_count+1))) | |||||
fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); | |||||
break; | |||||
} else { | |||||
__sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
Q_SET_COUNT(next_ptr, next_count+1)); | |||||
fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); | |||||
} | |||||
} | |||||
} | |||||
cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); | |||||
int ret = __sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
node_with_count); | |||||
fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); | |||||
} | |||||
int dequeue(cq_queue_t *queue, ErlNifPid **pid) | |||||
{ | |||||
fprintf(stderr, "dequeue\n"); | |||||
cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; | |||||
while (1) { | |||||
head = queue->head; | |||||
head_ptr = Q_PTR(head); | |||||
tail = queue->tail; | |||||
tail_ptr = Q_PTR(tail); | |||||
next = head->next; | |||||
next_ptr = Q_PTR(next); | |||||
fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); | |||||
if (head == queue->head) { | |||||
if (head_ptr == tail_ptr) { | |||||
if (next_ptr == NULL) { | |||||
return 0; /* Queue is empty */ | |||||
} | |||||
fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); | |||||
__sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); | |||||
} else { | |||||
fprintf(stderr, "next->value %lu\n", next_ptr->value); | |||||
*pid = next_ptr->value; | |||||
fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); | |||||
if (__sync_bool_compare_and_swap(&queue->head, | |||||
head, | |||||
Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
// free pid | |||||
//enif_free(Q_PTR(head)); | |||||
return 1; | |||||
} | |||||
int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { | |||||
/* Initialize global array mapping id to cq_t ptr */ | |||||
QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); | |||||
if (QUEUES == NULL) | |||||
return -1; | |||||
ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); | |||||
CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", | |||||
&free_resource, flags, NULL); | |||||
if (CQ_RESOURCE == NULL) | |||||
return -1; | |||||
return 0; | |||||
} | |||||
static ErlNifFunc nif_funcs[] = { | |||||
{"new" , 3, queue_new}, | |||||
{"free" , 1, queue_free}, | |||||
{"push" , 2, queue_push}, | |||||
{"async_pop", 1, queue_async_pop}, | |||||
{"debug" , 1, queue_debug}, | |||||
{"debug_poppers", 1, queue_debug_poppers}, | |||||
{"print_bits", 0, print_bits} | |||||
}; | |||||
ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL); |
@ -0,0 +1,71 @@ | |||||
#include <stdint.h> | |||||
#include "erl_nif.h" | |||||
#define CACHE_LINE_SIZE 64 | |||||
#define SLOT_INDEX(__index, __size) __index & (__size - 1) | |||||
#define Q_MASK 3L | |||||
#define Q_PTR(__ptr) (cq_node_t *) (((uint64_t)__ptr) & (~Q_MASK)) | |||||
#define Q_COUNT(__ptr) ((uint64_t) __ptr & Q_MASK) | |||||
#define Q_SET_COUNT(__ptr, __val) (cq_node_t *) ((uint64_t) __ptr | (__val & Q_MASK)) | |||||
#define STATE_EMPTY 0 | |||||
#define STATE_WRITE 1 | |||||
#define STATE_READ 2 | |||||
#define STATE_FULL 3 | |||||
ErlNifResourceType* CQ_RESOURCE; | |||||
typedef struct cq_node cq_node_t; | |||||
struct cq_node { | |||||
ErlNifEnv *env; | |||||
//ERL_NIF_TERM term; | |||||
ErlNifPid *value; | |||||
cq_node_t *next; | |||||
}; | |||||
typedef struct cq_queue { | |||||
cq_node_t *head; | |||||
cq_node_t *tail; | |||||
} cq_queue_t; | |||||
// TODO: Add padding between the fields | |||||
typedef struct cq { | |||||
uint32_t id; | |||||
uint64_t queue_size; | |||||
uint64_t overflow_size; | |||||
uint64_t head; | |||||
uint64_t tail; | |||||
uint8_t *slots_states; | |||||
ERL_NIF_TERM *slots_terms; | |||||
ErlNifEnv **slots_envs; | |||||
cq_queue_t *push_queue; | |||||
cq_queue_t *pop_queue; | |||||
uint8_t *overflow_states; | |||||
ERL_NIF_TERM *overflow_terms; | |||||
ErlNifEnv **overflow_envs; | |||||
} cq_t; | |||||
cq_t **QUEUES = NULL; /* Initialized on nif load */ | |||||
ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom); | |||||
ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* msg); | |||||
int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); | |||||
void free_resource(ErlNifEnv*, void*); | |||||
cq_queue_t* new_queue(void); | |||||
void enqueue(cq_queue_t *q, ErlNifPid *pid); |
@ -0,0 +1,73 @@ | |||||
# Based on c_src.mk from erlang.mk by Loic Hoguin <essen@ninenines.eu> | |||||
CURDIR := $(shell pwd) | |||||
BASEDIR := $(abspath $(CURDIR)) | |||||
PROJECT ?= $(notdir $(BASEDIR)) | |||||
PROJECT := $(strip $(PROJECT)) | |||||
ERTS_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s/erts-~s/include/\", [code:root_dir(), erlang:system_info(version)]).") | |||||
ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, include)]).") | |||||
ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -s init stop -eval "io:format(\"~s\", [code:lib_dir(erl_interface, lib)]).") | |||||
C_SRC_DIR = $(CURDIR) | |||||
C_SRC_OUTPUT ?= $(CURDIR)/../../priv/$(PROJECT).so | |||||
# System type and C compiler/flags. | |||||
UNAME_SYS := $(shell uname -s) | |||||
ifeq ($(UNAME_SYS), Darwin) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall | |||||
LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress | |||||
else ifeq ($(UNAME_SYS), FreeBSD) | |||||
CC ?= cc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
else ifeq ($(UNAME_SYS), Linux) | |||||
CC ?= gcc | |||||
CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes | |||||
CXXFLAGS ?= -O3 -finline-functions -Wall | |||||
endif | |||||
CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) | |||||
LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lerl_interface -lei | |||||
LDFLAGS += -shared | |||||
# Verbosity. | |||||
c_verbose_0 = @echo " C " $(?F); | |||||
c_verbose = $(c_verbose_$(V)) | |||||
cpp_verbose_0 = @echo " CPP " $(?F); | |||||
cpp_verbose = $(cpp_verbose_$(V)) | |||||
link_verbose_0 = @echo " LD " $(@F); | |||||
link_verbose = $(link_verbose_$(V)) | |||||
SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) | |||||
OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) | |||||
COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c | |||||
COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c | |||||
$(C_SRC_OUTPUT): $(OBJECTS) | |||||
$(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) | |||||
%.o: %.c | |||||
$(COMPILE_C) $(OUTPUT_OPTION) $< | |||||
%.o: %.cc | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.C | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
%.o: %.cpp | |||||
$(COMPILE_CPP) $(OUTPUT_OPTION) $< | |||||
clean: | |||||
@rm -f $(C_SRC_OUTPUT) $(OBJECTS) |
@ -0,0 +1,564 @@ | |||||
#include <stdio.h> | |||||
#include <unistd.h> | |||||
#include "erl_nif.h" | |||||
#include "cq_nif.h" | |||||
/* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT | |||||
# error Requires dirty schedulers | |||||
#endif */ | |||||
ERL_NIF_TERM | |||||
mk_atom(ErlNifEnv* env, const char* atom) | |||||
{ | |||||
ERL_NIF_TERM ret; | |||||
if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) | |||||
return enif_make_atom(env, atom); | |||||
return ret; | |||||
} | |||||
ERL_NIF_TERM | |||||
mk_error(ErlNifEnv* env, const char* mesg) | |||||
{ | |||||
return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); | |||||
if (q == NULL) | |||||
return mk_error(env, "priv_alloc_error"); | |||||
ERL_NIF_TERM ret = enif_make_resource(env, q); | |||||
/* enif_release_resource(ret); */ | |||||
uint32_t queue_id = 0; | |||||
uint32_t queue_size = 0; | |||||
uint32_t overflow_size = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id) || | |||||
!enif_get_uint(env, argv[1], &queue_size) || | |||||
!enif_get_uint(env, argv[2], &overflow_size)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "bad_queue_id"); | |||||
/* TODO: Check that queue_size is power of 2 */ | |||||
if (QUEUES[queue_id] != NULL) | |||||
return mk_error(env, "queue_id_already_exists"); | |||||
q->id = queue_id; | |||||
q->queue_size = queue_size; | |||||
q->overflow_size = overflow_size; | |||||
q->tail = 0; | |||||
q->head = 0; | |||||
q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); | |||||
q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); | |||||
q->push_queue = new_queue(); | |||||
q->pop_queue = new_queue(); | |||||
/* TODO: Check calloc return */ | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
ErlNifEnv *slot_env = enif_alloc_env(); | |||||
q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; | |||||
//q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); | |||||
} | |||||
QUEUES[q->id] = q; | |||||
return enif_make_tuple2(env, mk_atom(env, "ok"), ret); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
/* TODO: Free all the things! */ | |||||
QUEUES[queue_id] = NULL; | |||||
return enif_make_atom(env, "ok"); | |||||
} | |||||
/* Push to the head of the queue. */ | |||||
static ERL_NIF_TERM | |||||
queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
/* Load the queue */ | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
if (q->id != queue_id) | |||||
return mk_error(env, "not_identical_queue_id"); | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
fprintf(stderr, "queue slot %d, index %d, state %d\n", | |||||
i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); | |||||
} | |||||
/* If there's consumers waiting, the queue must be empty and we | |||||
should directly pick a consumer to notify. */ | |||||
ErlNifPid *waiting_consumer; | |||||
int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); | |||||
if (dequeue_ret) { | |||||
ErlNifEnv *msg_env = enif_alloc_env(); | |||||
ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); | |||||
ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); | |||||
if (enif_send(env, waiting_consumer, msg_env, tuple)) { | |||||
enif_free_env(msg_env); | |||||
return mk_atom(env, "ok"); | |||||
} else { | |||||
return mk_error(env, "notify_failed"); | |||||
} | |||||
} | |||||
/* Increment head and attempt to claim the slot by marking it as | |||||
busy. This ensures no other thread will attempt to modify this | |||||
slot. If we cannot lock it, another thread must have */ | |||||
uint64_t head = __sync_add_and_fetch(&q->head, 1); | |||||
size_t size = q->queue_size; | |||||
while (1) { | |||||
uint64_t index = SLOT_INDEX(head, size); | |||||
uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], | |||||
STATE_EMPTY, | |||||
STATE_WRITE); | |||||
switch (ret) { | |||||
case STATE_EMPTY: | |||||
head = __sync_add_and_fetch(&q->head, 1); | |||||
case STATE_WRITE: | |||||
/* We acquired the write lock, go ahead with the write. */ | |||||
break; | |||||
case STATE_FULL: | |||||
/* We have caught up with the tail and the buffer is | |||||
full. Block the producer until a consumer reads the | |||||
item. */ | |||||
return mk_error(env, "full_not_implemented"); | |||||
} | |||||
} | |||||
/* If head catches up with tail, the queue is full. Add to | |||||
overflow instead */ | |||||
/* Copy term to slot-specific temporary process env. */ | |||||
ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); | |||||
q->slots_terms[SLOT_INDEX(head, size)] = copy; | |||||
__sync_synchronize(); /* Or compiler memory barrier? */ | |||||
/* TODO: Do we need to collect garbage? */ | |||||
/* Mark the slot ready to be consumed */ | |||||
if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], | |||||
STATE_WRITE, | |||||
STATE_FULL)) { | |||||
return mk_atom(env, "ok"); | |||||
} else { | |||||
return mk_error(env, "could_not_update_slots_after_insert"); | |||||
} | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
/* Load queue */ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
if (q->id != queue_id) | |||||
return mk_error(env, "not_identical_queue_id"); | |||||
uint64_t qsize = q->queue_size; | |||||
uint64_t tail = q->tail; | |||||
uint64_t num_busy = 0; | |||||
/* Walk the buffer starting the tail position until we are either | |||||
able to consume a term or find an empty slot. */ | |||||
while (1) { | |||||
uint64_t index = SLOT_INDEX(tail, qsize); | |||||
uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], | |||||
STATE_FULL, | |||||
STATE_READ); | |||||
if (ret == STATE_READ) { | |||||
/* We were able to mark the term as read in progress. We | |||||
now have an exclusive lock. */ | |||||
break; | |||||
} else if (ret == STATE_WRITE) { | |||||
/* We found an item with a write in progress. If that | |||||
thread progresses, it will eventually mark the slot as | |||||
full. We can spin until that happens. | |||||
This can take an arbitrary amount of time and multiple | |||||
reading threads will compete for the same slot. | |||||
Instead we add the caller to the queue of blocking | |||||
consumers. When the next producer comes it will "help" | |||||
this thread by calling enif_send on the current | |||||
in-progress term *and* handle it's own terms. If | |||||
there's no new push to the queue, this will block | |||||
forever. */ | |||||
return mk_atom(env, "write_in_progress_not_implemented"); | |||||
} else if (ret == STATE_EMPTY) { | |||||
/* We found an empty item. Queue must be empty. Add | |||||
calling Erlang consumer process to queue of waiting | |||||
processes. When the next producer comes along, it first | |||||
checks the waiting consumers and calls enif_send | |||||
instead of writing to the slots. */ | |||||
ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); | |||||
pid = enif_self(env, pid); | |||||
enqueue(q->pop_queue, pid); | |||||
return mk_atom(env, "wait_for_msg"); | |||||
} else { | |||||
tail = __sync_add_and_fetch(&q->tail, 1); | |||||
} | |||||
} | |||||
/* Copy term into calling process env. The NIF env can now be | |||||
gargbage collected. */ | |||||
ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); | |||||
/* Mark the slot as free. Note: We don't increment the tail | |||||
position here, as another thread also walking the buffer might | |||||
have incremented it multiple times */ | |||||
q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; | |||||
if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], | |||||
STATE_READ, | |||||
STATE_EMPTY)) { | |||||
return enif_make_tuple2(env, mk_atom(env, "ok"), copy); | |||||
} else { | |||||
return mk_error(env, "could_not_update_slots_after_pop"); | |||||
} | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); | |||||
ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); | |||||
for (int i = 0; i < q->queue_size; i++) { | |||||
slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); | |||||
if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { | |||||
slots_terms[i] = mk_atom(env, "null"); | |||||
} else { | |||||
slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); | |||||
} | |||||
} | |||||
return enif_make_tuple4(env, | |||||
enif_make_uint64(env, q->tail), | |||||
enif_make_uint64(env, q->head), | |||||
enif_make_list_from_array(env, slots_states, q->queue_size), | |||||
enif_make_list_from_array(env, slots_terms, q->queue_size)); | |||||
} | |||||
static ERL_NIF_TERM | |||||
queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint32_t queue_id = 0; | |||||
if (!enif_get_uint(env, argv[0], &queue_id)) | |||||
return mk_error(env, "badarg"); | |||||
if (queue_id > 8) | |||||
return mk_error(env, "badarg"); | |||||
cq_t *q = QUEUES[queue_id]; | |||||
if (q == NULL) | |||||
return mk_error(env, "bad_queue_id"); | |||||
uint64_t pop_queue_size = 0; | |||||
cq_node_t *node = q->pop_queue->head; | |||||
if (node->value == NULL) { | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
while (node != NULL) { | |||||
pop_queue_size++; | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); | |||||
node = q->pop_queue->head; | |||||
node = Q_PTR(node); | |||||
if (node->value == NULL) { | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
uint64_t i = 0; | |||||
while (node != NULL) { | |||||
if (node->value == 0) { | |||||
pop_queue_pids[i] = mk_atom(env, "null"); | |||||
} | |||||
else { | |||||
pop_queue_pids[i] = enif_make_pid(env, node->value); | |||||
} | |||||
i++; | |||||
node = node->next; | |||||
node = Q_PTR(node); | |||||
} | |||||
ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); | |||||
enif_free(pop_queue_pids); | |||||
return list; | |||||
} | |||||
static ERL_NIF_TERM | |||||
print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
uint64_t *p1 = malloc(8); | |||||
*p1 = 0; | |||||
for (int bit = 63; bit >= 0; bit--) { | |||||
uint64_t power = 1 << bit; | |||||
//uint64_t byte = *p1; | |||||
uint64_t byte = p1; | |||||
fprintf(stderr, "%d", (byte & power) >> bit); | |||||
} | |||||
fprintf(stderr, "\n"); | |||||
//enif_free(p1); | |||||
return mk_atom(env, "ok"); | |||||
} | |||||
void free_resource(ErlNifEnv* env, void* arg) | |||||
{ | |||||
//cq_t *cq = (cq_t *) arg; | |||||
fprintf(stderr, "free_resource\n"); | |||||
} | |||||
cq_queue_t * new_queue() | |||||
{ | |||||
cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); | |||||
cq_node_t *node = enif_alloc(sizeof(cq_node_t)); | |||||
node->next = NULL; | |||||
//node->env = NULL; | |||||
node->value = NULL; | |||||
queue->head = node; | |||||
queue->tail = node; | |||||
return queue; | |||||
} | |||||
void enqueue(cq_queue_t *queue, ErlNifPid *pid) | |||||
{ | |||||
cq_node_t *node = enif_alloc(sizeof(cq_node_t)); | |||||
//node->env = enif_alloc_env(); | |||||
//node->term = enif_make_copy(node->env, term); | |||||
node->value = pid; | |||||
node->next = NULL; | |||||
fprintf(stderr, "node %lu\n", node); | |||||
cq_node_t *tail = NULL; | |||||
uint64_t tail_count = 0; | |||||
while (1) { | |||||
tail = queue->tail; | |||||
cq_node_t *tail_ptr = Q_PTR(tail); | |||||
tail_count = Q_COUNT(tail); | |||||
cq_node_t *next = tail->next; | |||||
cq_node_t *next_ptr = Q_PTR(next); | |||||
uint64_t next_count = Q_COUNT(next); | |||||
if (tail == queue->tail) { | |||||
fprintf(stderr, "tail == queue->tail\n"); | |||||
if (next_ptr == NULL) { | |||||
fprintf(stderr, "next_ptr == NULL\n"); | |||||
if (__sync_bool_compare_and_swap(&tail_ptr->next, | |||||
next, | |||||
Q_SET_COUNT(node, next_count+1))) | |||||
fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); | |||||
break; | |||||
} else { | |||||
__sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
Q_SET_COUNT(next_ptr, next_count+1)); | |||||
fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); | |||||
} | |||||
} | |||||
} | |||||
cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); | |||||
int ret = __sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
node_with_count); | |||||
fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); | |||||
} | |||||
int dequeue(cq_queue_t *queue, ErlNifPid **pid) | |||||
{ | |||||
fprintf(stderr, "dequeue\n"); | |||||
cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; | |||||
while (1) { | |||||
head = queue->head; | |||||
head_ptr = Q_PTR(head); | |||||
tail = queue->tail; | |||||
tail_ptr = Q_PTR(tail); | |||||
next = head->next; | |||||
next_ptr = Q_PTR(next); | |||||
fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); | |||||
if (head == queue->head) { | |||||
if (head_ptr == tail_ptr) { | |||||
if (next_ptr == NULL) { | |||||
return 0; /* Queue is empty */ | |||||
} | |||||
fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); | |||||
__sync_bool_compare_and_swap(&queue->tail, | |||||
tail, | |||||
Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); | |||||
} else { | |||||
fprintf(stderr, "next->value %lu\n", next_ptr->value); | |||||
*pid = next_ptr->value; | |||||
fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); | |||||
if (__sync_bool_compare_and_swap(&queue->head, | |||||
head, | |||||
Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
// free pid | |||||
//enif_free(Q_PTR(head)); | |||||
return 1; | |||||
} | |||||
int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { | |||||
/* Initialize global array mapping id to cq_t ptr */ | |||||
QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); | |||||
if (QUEUES == NULL) | |||||
return -1; | |||||
ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); | |||||
CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", | |||||
&free_resource, flags, NULL); | |||||
if (CQ_RESOURCE == NULL) | |||||
return -1; | |||||
return 0; | |||||
} | |||||
static ErlNifFunc nif_funcs[] = { | |||||
{"new" , 3, queue_new}, | |||||
{"free" , 1, queue_free}, | |||||
{"push" , 2, queue_push}, | |||||
{"async_pop", 1, queue_async_pop}, | |||||
{"debug" , 1, queue_debug}, | |||||
{"debug_poppers", 1, queue_debug_poppers}, | |||||
{"print_bits", 0, print_bits} | |||||
}; | |||||
ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL); |
@ -0,0 +1,71 @@ | |||||
#include <stdint.h> | |||||
#include "erl_nif.h" | |||||
#define CACHE_LINE_SIZE 64 | |||||
#define SLOT_INDEX(__index, __size) __index & (__size - 1) | |||||
#define Q_MASK 3L | |||||
#define Q_PTR(__ptr) (cq_node_t *) (((uint64_t)__ptr) & (~Q_MASK)) | |||||
#define Q_COUNT(__ptr) ((uint64_t) __ptr & Q_MASK) | |||||
#define Q_SET_COUNT(__ptr, __val) (cq_node_t *) ((uint64_t) __ptr | (__val & Q_MASK)) | |||||
#define STATE_EMPTY 0 | |||||
#define STATE_WRITE 1 | |||||
#define STATE_READ 2 | |||||
#define STATE_FULL 3 | |||||
ErlNifResourceType* CQ_RESOURCE; | |||||
typedef struct cq_node cq_node_t; | |||||
struct cq_node { | |||||
ErlNifEnv *env; | |||||
//ERL_NIF_TERM term; | |||||
ErlNifPid *value; | |||||
cq_node_t *next; | |||||
}; | |||||
typedef struct cq_queue { | |||||
cq_node_t *head; | |||||
cq_node_t *tail; | |||||
} cq_queue_t; | |||||
// TODO: Add padding between the fields | |||||
typedef struct cq { | |||||
uint32_t id; | |||||
uint64_t queue_size; | |||||
uint64_t overflow_size; | |||||
uint64_t head; | |||||
uint64_t tail; | |||||
uint8_t *slots_states; | |||||
ERL_NIF_TERM *slots_terms; | |||||
ErlNifEnv **slots_envs; | |||||
cq_queue_t *push_queue; | |||||
cq_queue_t *pop_queue; | |||||
uint8_t *overflow_states; | |||||
ERL_NIF_TERM *overflow_terms; | |||||
ErlNifEnv **overflow_envs; | |||||
} cq_t; | |||||
cq_t **QUEUES = NULL; /* Initialized on nif load */ | |||||
ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom); | |||||
ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* msg); | |||||
int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); | |||||
void free_resource(ErlNifEnv*, void*); | |||||
cq_queue_t* new_queue(void); | |||||
void enqueue(cq_queue_t *q, ErlNifPid *pid); |
@ -1,2 +1,8 @@ | |||||
{erl_opts, [no_debug_info]}. | |||||
{deps, []}. | |||||
{erl_opts, [no_debug_info, {i, "include"}]}. | |||||
{deps, []}. | |||||
{pre_hooks, | |||||
[{"linux", compile, "make -C c_src all"}]}. | |||||
{post_hooks, | |||||
[{"linux", clean, "make -C c_src clean"}]}. |
@ -0,0 +1,64 @@ | |||||
# 描述 | |||||
NIF库包含Erlang模块的某些功能的本机实现。像其他任何函数一样,调用本机实现的函数(NIF),与调用方没有任何区别。 | |||||
NIF库被构建为动态链接的库文件,并通过调用erlang:load_nif / 2在运行时加载。 | |||||
警告 | |||||
谨慎使用此功能。 | |||||
执行本机功能作为VM的本机代码的直接扩展。执行不是在安全的环境中进行的。VM 无法提供与执行Erlang代码时相同的服务, | |||||
例如抢先式调度或内存保护。如果本机功能运行不正常,则整个VM都会出现异常。 | |||||
崩溃的本机功能将使整个VM崩溃。 | |||||
错误实现的本机功能可能会导致VM内部状态不一致,从而导致VM崩溃或在调用本机功能后的任何时候VM的其他异常行为。 | |||||
在返回之前进行长时间工作的本机功能会降低VM的响应能力,并可能导致其他奇怪的行为。这种奇怪的行为包括但不限于极端的内存 | |||||
使用情况以及调度程序之间的不良负载平衡。在Erlang / OTP发行版之间,由于冗长的工作而可能发生的奇怪行为也会有所不同。 | |||||
# 简单示例 | |||||
``` | |||||
/* niftest.c */ | |||||
#include <erl_nif.h> | |||||
static ERL_NIF_TERM hello(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
return enif_make_string(env, "Hello world!", ERL_NIF_LATIN1); | |||||
} | |||||
static ErlNifFunc nif_funcs[] = | |||||
{ | |||||
{"hello", 0, hello} | |||||
}; | |||||
ERL_NIF_INIT(niftest,nif_funcs,NULL,NULL,NULL,NULL) | |||||
``` | |||||
``` | |||||
-module(niftest). | |||||
-export([init/0, hello/0]). | |||||
-on_load(init/0). | |||||
init() -> | |||||
erlang:load_nif("./niftest", 0). | |||||
hello() -> | |||||
erlang:nif_error("NIF library not loaded"). | |||||
``` | |||||
在上面的示例中,使用了on_load指令,该命令功能是在加载模块时自动调用的指定的函数-init/0。 | |||||
init函数初始化依次调用erlang:load_nif / 2 , | |||||
该加载器会加载NIF库,并用C中的本机实现替换hello函数。加载后,NIF库将保持不变。在清除它所属的模块代码版本之前,不会将其卸载。 | |||||
如果在成功加载NIF库之前调用了该函数,则每个NIF必须具有用Erlang调用的实现。典型的此类存根实现是调用erlang:nif_error, | |||||
这将引发异常。如果NIF库缺少某些操作系统或硬件体系结构的实现,则Erlang函数也可以用作后备实现。 | |||||
注意 | |||||
NIF不必导出,它可以在模块本地。但是,编译器会优化未使用的本地存根函数,从而导致NIF库的加载失败。 | |||||
# 功能性 | |||||
NIF代码和Erlang运行时系统之间的所有交互都是通过调用NIF API函数来执行的。存在以下功能的功能: | |||||
读写Erlang术语 | |||||
任何Erlang术语都可以作为函数参数传递给NIF,并作为函数返回值返回。这些术语属于C类型的 ERL_NIF_TERM,只能使用API函数读取或写入。大部分用于读取术语内容的函数都以enif_get_为前缀,并且如果该术语属于预期类型(或非预期类型),则通常返回 true(或false)。编写术语的函数都带有enif_make_前缀 ,通常返回创建的ERL_NIF_TERM。还有一些查询术语的函数,例如enif_is_atom,enif_is_identical和enif_compare。 | |||||
类型的所有方面ERL_NIF_TERM属于类型的环境ErlNifEnv。术语的生存期由其环境对象的生存期控制。读取或写入术语的所有API函数都将术语所属的环境作为第一个函数参数 | |||||
增加和减少资源的引用计数的次数必须匹配,否则可能引发问题。 | |||||
至此,持久资源的主要接口的实现就介绍完了,用户使用时,可以先通过enif_open_resource_type建立资源类型的描述符, | |||||
然后利用此描述符,使用enif_alloc_resource分配资源所占用的内存空间,使用enif_make_resource将资源导出到erlang模块层, | |||||
在进程间传递资源描述符,资源再传回NIF时,可以通过enif_get_resource取回资源描述符中的资源数据结构, | |||||
同时可以通过enif_keep_resource来共享资源,通过enif_release_resource来放弃使用资源,gc系统也会正确回收引用计数为0的资源, | |||||
开发者再也不用担心内存没有被正确释放了。 | |||||
持久资源为NIF的开发带来了极大的便利,用户可以将一些大规模的数据结构一次传入内存,生成一个资源描述符, | |||||
然后在进程间传递资源描述符而不是资源数据本身,减轻每次资源数据拷贝的开销,同时持久资源也是线程安全的, | |||||
写erlang程序也可以像写c程序一样高效了。 | |||||
@ -0,0 +1,27 @@ | |||||
# window下编译nif dll 需要安装vs | |||||
## 第一种 makefile配置(可参考jiffy的Makefile) 命令行下编译dll 需要设置vs相关环境变量 | |||||
具体要设置的环境变量可参考下面几个 | |||||
``` | |||||
path 新增 | |||||
D:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.24.28314\bin\Hostx64\x64 | |||||
LIB | |||||
D:\Windows Kits\10\Lib\10.0.18362.0\ucrt\x64 | |||||
D:\Windows Kits\10\Lib\10.0.18362.0\ucrt\x64 | |||||
D:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.24.28314\lib\x64 | |||||
INCLUDE | |||||
D:\Windows Kits\10\Include\10.0.18362.0\ucrt | |||||
D:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.24.28314\include | |||||
``` | |||||
## 第二种 在vs单独编译 然后拷贝使用 | |||||
VS编译 | |||||
1 新建空项目或者从现有代码创建项目 | |||||
2 先选择 编辑框上边的 解决方案配置 与 解决方案平台 | |||||
3 右键项目属性 设置 配置与第2步 解决方案配置 一样 设置 平台与第二步设置的 解决方案平台 一样 | |||||
4 右键项目属性 配置属性 -> 常规 -> 配置类型 ->动态库(.dll) | |||||
5 右键项目属性 配置属性 -> VC++目录 -> 包含目录 新增 D:\Program Files\erl10.6\erts-10.6\include | |||||
6 右键项目属性 生成 | |||||
注意编译使用的erlang include要合使用的erl版本对应 |