#include #include #include "erl_nif.h" #include "cq_nif.h" /* #ifndef ERL_NIF_DIRTY_SCHEDULER_SUPPORT # error Requires dirty schedulers #endif */ ERL_NIF_TERM mk_atom(ErlNifEnv* env, const char* atom) { ERL_NIF_TERM ret; if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) return enif_make_atom(env, atom); return ret; } ERL_NIF_TERM mk_error(ErlNifEnv* env, const char* mesg) { return enif_make_tuple2(env, mk_atom(env, "error"), mk_atom(env, mesg)); } static ERL_NIF_TERM queue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { cq_t *q = enif_alloc_resource(CQ_RESOURCE, sizeof(cq_t)); if (q == NULL) return mk_error(env, "priv_alloc_error"); ERL_NIF_TERM ret = enif_make_resource(env, q); /* enif_release_resource(ret); */ uint32_t queue_id = 0; uint32_t queue_size = 0; uint32_t overflow_size = 0; if (!enif_get_uint(env, argv[0], &queue_id) || !enif_get_uint(env, argv[1], &queue_size) || !enif_get_uint(env, argv[2], &overflow_size)) return mk_error(env, "badarg"); if (queue_id > 8) return mk_error(env, "bad_queue_id"); /* TODO: Check that queue_size is power of 2 */ if (QUEUES[queue_id] != NULL) return mk_error(env, "queue_id_already_exists"); q->id = queue_id; q->queue_size = queue_size; q->overflow_size = overflow_size; q->tail = 0; q->head = 0; q->slots_states = calloc(q->queue_size, CACHE_LINE_SIZE); q->slots_terms = calloc(q->queue_size, CACHE_LINE_SIZE); q->slots_envs = calloc(q->queue_size, CACHE_LINE_SIZE); q->overflow_terms = calloc(q->overflow_size, CACHE_LINE_SIZE); q->overflow_envs = calloc(q->queue_size, CACHE_LINE_SIZE); q->push_queue = new_queue(); q->pop_queue = new_queue(); /* TODO: Check calloc return */ for (int i = 0; i < q->queue_size; i++) { ErlNifEnv *slot_env = enif_alloc_env(); q->slots_envs[i*CACHE_LINE_SIZE] = slot_env; //q->overflow_envs[i*CACHE_LINE_SIZE] = (ErlNifEnv *) enif_alloc_env(); } QUEUES[q->id] = q; return enif_make_tuple2(env, mk_atom(env, "ok"), ret); } static ERL_NIF_TERM queue_free(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { uint32_t queue_id = 0; if (!enif_get_uint(env, argv[0], &queue_id)) return mk_error(env, "badarg"); if (queue_id > 8) return mk_error(env, "badarg"); cq_t *q = QUEUES[queue_id]; if (q == NULL) return mk_error(env, "bad_queue_id"); /* TODO: Free all the things! */ QUEUES[queue_id] = NULL; return enif_make_atom(env, "ok"); } /* Push to the head of the queue. */ static ERL_NIF_TERM queue_push(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { uint32_t queue_id = 0; if (!enif_get_uint(env, argv[0], &queue_id)) return mk_error(env, "badarg"); if (queue_id > 8) return mk_error(env, "badarg"); /* Load the queue */ cq_t *q = QUEUES[queue_id]; if (q == NULL) return mk_error(env, "bad_queue_id"); if (q->id != queue_id) return mk_error(env, "not_identical_queue_id"); for (int i = 0; i < q->queue_size; i++) { fprintf(stderr, "queue slot %d, index %d, state %d\n", i, i*CACHE_LINE_SIZE, q->slots_states[i*CACHE_LINE_SIZE]); } /* If there's consumers waiting, the queue must be empty and we should directly pick a consumer to notify. */ ErlNifPid *waiting_consumer; int dequeue_ret = dequeue(q->pop_queue, &waiting_consumer); if (dequeue_ret) { ErlNifEnv *msg_env = enif_alloc_env(); ERL_NIF_TERM copy = enif_make_copy(msg_env, argv[1]); ERL_NIF_TERM tuple = enif_make_tuple2(msg_env, mk_atom(env, "pop"), copy); if (enif_send(env, waiting_consumer, msg_env, tuple)) { enif_free_env(msg_env); return mk_atom(env, "ok"); } else { return mk_error(env, "notify_failed"); } } /* Increment head and attempt to claim the slot by marking it as busy. This ensures no other thread will attempt to modify this slot. If we cannot lock it, another thread must have */ uint64_t head = __sync_add_and_fetch(&q->head, 1); size_t size = q->queue_size; while (1) { uint64_t index = SLOT_INDEX(head, size); uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], STATE_EMPTY, STATE_WRITE); switch (ret) { case STATE_EMPTY: head = __sync_add_and_fetch(&q->head, 1); case STATE_WRITE: /* We acquired the write lock, go ahead with the write. */ break; case STATE_FULL: /* We have caught up with the tail and the buffer is full. Block the producer until a consumer reads the item. */ return mk_error(env, "full_not_implemented"); } } /* If head catches up with tail, the queue is full. Add to overflow instead */ /* Copy term to slot-specific temporary process env. */ ERL_NIF_TERM copy = enif_make_copy(q->slots_envs[SLOT_INDEX(head, size)], argv[1]); q->slots_terms[SLOT_INDEX(head, size)] = copy; __sync_synchronize(); /* Or compiler memory barrier? */ /* TODO: Do we need to collect garbage? */ /* Mark the slot ready to be consumed */ if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(head, size)], STATE_WRITE, STATE_FULL)) { return mk_atom(env, "ok"); } else { return mk_error(env, "could_not_update_slots_after_insert"); } } static ERL_NIF_TERM queue_async_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { /* Load queue */ uint32_t queue_id = 0; if (!enif_get_uint(env, argv[0], &queue_id)) return mk_error(env, "badarg"); if (queue_id > 8) return mk_error(env, "badarg"); cq_t *q = QUEUES[queue_id]; if (q == NULL) return mk_error(env, "bad_queue_id"); if (q->id != queue_id) return mk_error(env, "not_identical_queue_id"); uint64_t qsize = q->queue_size; uint64_t tail = q->tail; uint64_t num_busy = 0; /* Walk the buffer starting the tail position until we are either able to consume a term or find an empty slot. */ while (1) { uint64_t index = SLOT_INDEX(tail, qsize); uint64_t ret = __sync_val_compare_and_swap(&q->slots_states[index], STATE_FULL, STATE_READ); if (ret == STATE_READ) { /* We were able to mark the term as read in progress. We now have an exclusive lock. */ break; } else if (ret == STATE_WRITE) { /* We found an item with a write in progress. If that thread progresses, it will eventually mark the slot as full. We can spin until that happens. This can take an arbitrary amount of time and multiple reading threads will compete for the same slot. Instead we add the caller to the queue of blocking consumers. When the next producer comes it will "help" this thread by calling enif_send on the current in-progress term *and* handle it's own terms. If there's no new push to the queue, this will block forever. */ return mk_atom(env, "write_in_progress_not_implemented"); } else if (ret == STATE_EMPTY) { /* We found an empty item. Queue must be empty. Add calling Erlang consumer process to queue of waiting processes. When the next producer comes along, it first checks the waiting consumers and calls enif_send instead of writing to the slots. */ ErlNifPid *pid = enif_alloc(sizeof(ErlNifPid)); pid = enif_self(env, pid); enqueue(q->pop_queue, pid); return mk_atom(env, "wait_for_msg"); } else { tail = __sync_add_and_fetch(&q->tail, 1); } } /* Copy term into calling process env. The NIF env can now be gargbage collected. */ ERL_NIF_TERM copy = enif_make_copy(env, q->slots_terms[SLOT_INDEX(tail, qsize)]); /* Mark the slot as free. Note: We don't increment the tail position here, as another thread also walking the buffer might have incremented it multiple times */ q->slots_terms[SLOT_INDEX(tail, qsize)] = 0; if (__sync_bool_compare_and_swap(&q->slots_states[SLOT_INDEX(tail, qsize)], STATE_READ, STATE_EMPTY)) { return enif_make_tuple2(env, mk_atom(env, "ok"), copy); } else { return mk_error(env, "could_not_update_slots_after_pop"); } } static ERL_NIF_TERM queue_debug(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { uint32_t queue_id = 0; if (!enif_get_uint(env, argv[0], &queue_id)) return mk_error(env, "badarg"); if (queue_id > 8) return mk_error(env, "badarg"); cq_t *q = QUEUES[queue_id]; if (q == NULL) return mk_error(env, "bad_queue_id"); ERL_NIF_TERM *slots_states = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); ERL_NIF_TERM *slots_terms = enif_alloc(sizeof(ERL_NIF_TERM) * q->queue_size); for (int i = 0; i < q->queue_size; i++) { slots_states[i] = enif_make_int(env, q->slots_states[i * CACHE_LINE_SIZE]); if (q->slots_terms[i * CACHE_LINE_SIZE] == 0) { slots_terms[i] = mk_atom(env, "null"); } else { slots_terms[i] = enif_make_copy(env, q->slots_terms[i * CACHE_LINE_SIZE]); } } return enif_make_tuple4(env, enif_make_uint64(env, q->tail), enif_make_uint64(env, q->head), enif_make_list_from_array(env, slots_states, q->queue_size), enif_make_list_from_array(env, slots_terms, q->queue_size)); } static ERL_NIF_TERM queue_debug_poppers(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { uint32_t queue_id = 0; if (!enif_get_uint(env, argv[0], &queue_id)) return mk_error(env, "badarg"); if (queue_id > 8) return mk_error(env, "badarg"); cq_t *q = QUEUES[queue_id]; if (q == NULL) return mk_error(env, "bad_queue_id"); uint64_t pop_queue_size = 0; cq_node_t *node = q->pop_queue->head; if (node->value == NULL) { node = node->next; node = Q_PTR(node); } while (node != NULL) { pop_queue_size++; node = node->next; node = Q_PTR(node); } ERL_NIF_TERM *pop_queue_pids = enif_alloc(sizeof(ERL_NIF_TERM) * pop_queue_size); node = q->pop_queue->head; node = Q_PTR(node); if (node->value == NULL) { node = node->next; node = Q_PTR(node); } uint64_t i = 0; while (node != NULL) { if (node->value == 0) { pop_queue_pids[i] = mk_atom(env, "null"); } else { pop_queue_pids[i] = enif_make_pid(env, node->value); } i++; node = node->next; node = Q_PTR(node); } ERL_NIF_TERM list = enif_make_list_from_array(env, pop_queue_pids, pop_queue_size); enif_free(pop_queue_pids); return list; } static ERL_NIF_TERM print_bits(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { uint64_t *p1 = malloc(8); *p1 = 0; for (int bit = 63; bit >= 0; bit--) { uint64_t power = 1 << bit; //uint64_t byte = *p1; uint64_t byte = p1; fprintf(stderr, "%d", (byte & power) >> bit); } fprintf(stderr, "\n"); //enif_free(p1); return mk_atom(env, "ok"); } void free_resource(ErlNifEnv* env, void* arg) { //cq_t *cq = (cq_t *) arg; fprintf(stderr, "free_resource\n"); } cq_queue_t * new_queue() { cq_queue_t *queue = enif_alloc(sizeof(cq_queue_t)); cq_node_t *node = enif_alloc(sizeof(cq_node_t)); node->next = NULL; //node->env = NULL; node->value = NULL; queue->head = node; queue->tail = node; return queue; } void enqueue(cq_queue_t *queue, ErlNifPid *pid) { cq_node_t *node = enif_alloc(sizeof(cq_node_t)); //node->env = enif_alloc_env(); //node->term = enif_make_copy(node->env, term); node->value = pid; node->next = NULL; fprintf(stderr, "node %lu\n", node); cq_node_t *tail = NULL; uint64_t tail_count = 0; while (1) { tail = queue->tail; cq_node_t *tail_ptr = Q_PTR(tail); tail_count = Q_COUNT(tail); cq_node_t *next = tail->next; cq_node_t *next_ptr = Q_PTR(next); uint64_t next_count = Q_COUNT(next); if (tail == queue->tail) { fprintf(stderr, "tail == queue->tail\n"); if (next_ptr == NULL) { fprintf(stderr, "next_ptr == NULL\n"); if (__sync_bool_compare_and_swap(&tail_ptr->next, next, Q_SET_COUNT(node, next_count+1))) fprintf(stderr, "CAS(tail_ptr->next, next, (node, next_count+1)) -> true\n"); break; } else { __sync_bool_compare_and_swap(&queue->tail, tail, Q_SET_COUNT(next_ptr, next_count+1)); fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, next_count+1))\n"); } } } cq_node_t *node_with_count = Q_SET_COUNT(node, tail_count+1); int ret = __sync_bool_compare_and_swap(&queue->tail, tail, node_with_count); fprintf(stderr, "CAS(queue->tail, tail, %lu) -> %d\n", node_with_count, ret); } int dequeue(cq_queue_t *queue, ErlNifPid **pid) { fprintf(stderr, "dequeue\n"); cq_node_t *head, *head_ptr, *tail, *tail_ptr, *next, *next_ptr; while (1) { head = queue->head; head_ptr = Q_PTR(head); tail = queue->tail; tail_ptr = Q_PTR(tail); next = head->next; next_ptr = Q_PTR(next); fprintf(stderr, "head %lu, tail %lu, next %lu\n", head, tail, next); if (head == queue->head) { if (head_ptr == tail_ptr) { if (next_ptr == NULL) { return 0; /* Queue is empty */ } fprintf(stderr, "CAS(queue->tail, tail, (next_ptr, tail+1))\n"); __sync_bool_compare_and_swap(&queue->tail, tail, Q_SET_COUNT(next_ptr, Q_COUNT(tail)+1)); } else { fprintf(stderr, "next->value %lu\n", next_ptr->value); *pid = next_ptr->value; fprintf(stderr, "CAS(queue->head, head, (next_ptr, head+1))\n"); if (__sync_bool_compare_and_swap(&queue->head, head, Q_SET_COUNT(next_ptr, Q_COUNT(head)+1))) break; } } } // free pid //enif_free(Q_PTR(head)); return 1; } int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { /* Initialize global array mapping id to cq_t ptr */ QUEUES = (cq_t **) calloc(8, sizeof(cq_t **)); if (QUEUES == NULL) return -1; ErlNifResourceFlags flags = (ErlNifResourceFlags)(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); CQ_RESOURCE = enif_open_resource_type(env, "cq", "cq", &free_resource, flags, NULL); if (CQ_RESOURCE == NULL) return -1; return 0; } static ErlNifFunc nif_funcs[] = { {"new" , 3, queue_new}, {"free" , 1, queue_free}, {"push" , 2, queue_push}, {"async_pop", 1, queue_async_pop}, {"debug" , 1, queue_debug}, {"debug_poppers", 1, queue_debug_poppers}, {"print_bits", 0, print_bits} }; ERL_NIF_INIT(cq, nif_funcs, load, NULL, NULL, NULL);