@ -0,0 +1,62 @@ | |||||
#ifndef C_SRC_CRITICAL_SECTION_H_ | |||||
#define C_SRC_CRITICAL_SECTION_H_ | |||||
#include "erl_nif.h" | |||||
#include "macros.h" | |||||
class CriticalSection | |||||
{ | |||||
public: | |||||
CriticalSection() { } | |||||
virtual ~CriticalSection() {} | |||||
virtual void Enter() = 0; | |||||
virtual void Leave() = 0; | |||||
}; | |||||
class NullCriticalSection : public CriticalSection | |||||
{ | |||||
public: | |||||
NullCriticalSection() {} | |||||
~NullCriticalSection() {} | |||||
void Enter() override {} | |||||
void Leave() override {} | |||||
private: | |||||
DISALLOW_COPY_AND_ASSIGN(NullCriticalSection); | |||||
}; | |||||
class EnifCriticalSection : public CriticalSection | |||||
{ | |||||
public: | |||||
EnifCriticalSection() { mutex_ = enif_mutex_create(NULL);} | |||||
~EnifCriticalSection() {enif_mutex_destroy(mutex_);} | |||||
void Enter() override {enif_mutex_lock(mutex_);} | |||||
void Leave() override {enif_mutex_unlock(mutex_);} | |||||
private: | |||||
ErlNifMutex *mutex_; | |||||
DISALLOW_COPY_AND_ASSIGN(EnifCriticalSection); | |||||
}; | |||||
class CritScope | |||||
{ | |||||
public: | |||||
explicit CritScope(CriticalSection *pp) : pcrit_(pp) { pcrit_->Enter();} | |||||
~CritScope() {pcrit_->Leave();} | |||||
private: | |||||
CriticalSection *pcrit_; | |||||
DISALLOW_COPY_AND_ASSIGN(CritScope); | |||||
}; | |||||
#endif // C_SRC_CRITICAL_SECTION_H_ |
@ -0,0 +1,257 @@ | |||||
#include "epqueue.h" | |||||
#include "epqueue_item.h" | |||||
#include "epqueue_nif.h" | |||||
#include "priority_queue.h" | |||||
#include "critical_section.h" | |||||
#include "nif_utils.h" | |||||
#include "macros.h" | |||||
#include <string.h> | |||||
struct epqueue | |||||
{ | |||||
CriticalSection* crit; | |||||
PriorityQueue* queue; | |||||
ERL_NIF_TERM owner_pid; | |||||
}; | |||||
void nif_epqueue_free(ErlNifEnv* env, void* obj) | |||||
{ | |||||
UNUSED(env); | |||||
epqueue* qinst = static_cast<epqueue*>(obj); | |||||
if(qinst->queue) | |||||
delete qinst->queue; | |||||
if(qinst->crit) | |||||
delete qinst->crit; | |||||
} | |||||
bool internal_insert(epqueue* q, queue_item* item) | |||||
{ | |||||
CritScope ss(q->crit); | |||||
return q->queue->insert(item); | |||||
} | |||||
bool internal_remove(epqueue* q, queue_item* item) | |||||
{ | |||||
CritScope ss(q->crit); | |||||
if(item->heap_index == -1) | |||||
return false; | |||||
return q->queue->remove(item, item->heap_index); | |||||
} | |||||
queue_item* internal_pop(epqueue* q) | |||||
{ | |||||
CritScope ss(q->crit); | |||||
return static_cast<queue_item*>(q->queue->pop()); | |||||
} | |||||
bool is_owner(ErlNifEnv* env, epqueue* q) | |||||
{ | |||||
if(q->owner_pid == 0) | |||||
return true; | |||||
ErlNifPid self; | |||||
if(enif_self(env, &self) && !enif_is_identical(q->owner_pid, enif_make_pid(env, &self))) | |||||
return false; | |||||
return true; | |||||
} | |||||
ERL_NIF_TERM nif_epqueue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
UNUSED(argc); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_priv_data(env)); | |||||
if(!enif_is_list(env, argv[0])) | |||||
return enif_make_badarg(env); | |||||
bool use_lock = false; | |||||
ERL_NIF_TERM settings_list = argv[0]; | |||||
ERL_NIF_TERM head; | |||||
while(enif_get_list_cell(env, settings_list, &head, &settings_list)) | |||||
{ | |||||
const ERL_NIF_TERM *items; | |||||
int arity; | |||||
if(!enif_get_tuple(env, head, &arity, &items) || arity != 2) | |||||
return enif_make_badarg(env); | |||||
if(enif_is_identical(items[0], ATOMS.atomGlobalLock)) | |||||
use_lock = enif_is_identical(items[1], ATOMS.atomTrue); | |||||
else | |||||
return enif_make_badarg(env); | |||||
} | |||||
epqueue* qinst = static_cast<epqueue*>(enif_alloc_resource(data->resPQueueInstance, sizeof(epqueue))); | |||||
if(qinst == NULL) | |||||
return make_error(env, "enif_alloc_resource failed"); | |||||
memset(qinst, 0, sizeof(epqueue)); | |||||
if(use_lock) | |||||
{ | |||||
qinst->crit = new EnifCriticalSection(); | |||||
qinst->owner_pid = 0; | |||||
} | |||||
else | |||||
{ | |||||
qinst->crit = new NullCriticalSection(); | |||||
ErlNifPid self; | |||||
enif_self(env, &self); | |||||
qinst->owner_pid = enif_make_pid(env, &self); | |||||
} | |||||
qinst->queue = new PriorityQueue(epqueue_item_less, epqueue_item_update_pos, enif_release_resource); | |||||
ERL_NIF_TERM term = enif_make_resource(env, qinst); | |||||
enif_release_resource(qinst); | |||||
return enif_make_tuple2(env, ATOMS.atomOk, term); | |||||
} | |||||
ERL_NIF_TERM nif_epqueue_size(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
UNUSED(argc); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_priv_data(env)); | |||||
epqueue* inst = NULL; | |||||
if(!enif_get_resource(env, argv[0], data->resPQueueInstance, reinterpret_cast<void**>(&inst))) | |||||
return enif_make_badarg(env); | |||||
if(!is_owner(env, inst)) | |||||
return enif_make_badarg(env); | |||||
CritScope ss(inst->crit); | |||||
return enif_make_int(env, inst->queue->size()); | |||||
} | |||||
ERL_NIF_TERM nif_epqueue_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
UNUSED(argc); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_priv_data(env)); | |||||
epqueue* inst; | |||||
ErlNifBinary data_bin; | |||||
long priority; | |||||
if(!enif_get_resource(env, argv[0], data->resPQueueInstance, reinterpret_cast<void**>(&inst))) | |||||
return enif_make_badarg(env); | |||||
if(!is_owner(env, inst)) | |||||
return enif_make_badarg(env); | |||||
if(!enif_get_long(env, argv[2], &priority)) | |||||
return enif_make_badarg(env); | |||||
if(!enif_term_to_binary(env, argv[1], &data_bin)) | |||||
return enif_make_badarg(env); | |||||
queue_item* item = epqueue_item_new(data, data_bin, priority); | |||||
if(item == NULL) | |||||
{ | |||||
enif_release_binary(&data_bin); | |||||
return make_error(env, "failed to allocate a new item"); | |||||
} | |||||
if(!internal_insert(inst, item)) | |||||
{ | |||||
enif_release_resource(&item); | |||||
return make_error(env, "failed to insert new item in the queue"); | |||||
} | |||||
ERL_NIF_TERM ref = enif_make_resource(env, item); | |||||
return enif_make_tuple2(env, ATOMS.atomOk, ref); | |||||
} | |||||
ERL_NIF_TERM nif_epqueue_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
UNUSED(argc); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_priv_data(env)); | |||||
epqueue* inst = NULL; | |||||
queue_item* item = NULL; | |||||
if(!enif_get_resource(env, argv[0], data->resPQueueInstance, reinterpret_cast<void**>(&inst))) | |||||
return enif_make_badarg(env); | |||||
if(!is_owner(env, inst)) | |||||
return enif_make_badarg(env); | |||||
if(!enif_get_resource(env, argv[1], data->resPQueueItem, reinterpret_cast<void**>(&item))) | |||||
return enif_make_badarg(env); | |||||
if(!internal_remove(inst, item)) | |||||
return ATOMS.atomFalse; | |||||
enif_release_resource(&item); | |||||
return ATOMS.atomTrue; | |||||
} | |||||
ERL_NIF_TERM nif_epqueue_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
UNUSED(argc); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_priv_data(env)); | |||||
epqueue* inst = NULL; | |||||
if(!enif_get_resource(env, argv[0], data->resPQueueInstance, reinterpret_cast<void**>(&inst))) | |||||
return enif_make_badarg(env); | |||||
if(!is_owner(env, inst)) | |||||
return enif_make_badarg(env); | |||||
queue_item* item = internal_pop(inst); | |||||
if(item == NULL) | |||||
return ATOMS.atomUndefined; | |||||
ERL_NIF_TERM bin_term; | |||||
if(enif_binary_to_term(env, item->data.data, item->data.size, &bin_term, 0) == 0) | |||||
{ | |||||
enif_release_resource(item); | |||||
return make_error(env, "failed to decode data"); | |||||
} | |||||
enif_release_resource(item); | |||||
return enif_make_tuple3(env, ATOMS.atomOk, bin_term, enif_make_long(env, item->priority)); | |||||
} | |||||
ERL_NIF_TERM nif_epqueue_peek(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) | |||||
{ | |||||
UNUSED(argc); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_priv_data(env)); | |||||
epqueue* inst = NULL; | |||||
if(!enif_get_resource(env, argv[0], data->resPQueueInstance, reinterpret_cast<void**>(&inst))) | |||||
return enif_make_badarg(env); | |||||
if(!is_owner(env, inst)) | |||||
return enif_make_badarg(env); | |||||
CritScope ss(inst->crit); | |||||
queue_item* item = static_cast<queue_item*>(inst->queue->peek()); | |||||
if(item == NULL) | |||||
return ATOMS.atomUndefined; | |||||
ERL_NIF_TERM bin_term; | |||||
if(enif_binary_to_term(env, item->data.data, item->data.size, &bin_term, 0) == 0) | |||||
return make_error(env, "failed to decode data"); | |||||
return enif_make_tuple3(env, ATOMS.atomOk, bin_term, enif_make_int64(env, item->priority)); | |||||
} |
@ -0,0 +1,14 @@ | |||||
#ifndef C_SRC_EPQUEUE_H_ | |||||
#define C_SRC_EPQUEUE_H_ | |||||
#include "erl_nif.h" | |||||
void nif_epqueue_free(ErlNifEnv* env, void* obj); | |||||
ERL_NIF_TERM nif_epqueue_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_epqueue_size(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_epqueue_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_epqueue_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_epqueue_pop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); | |||||
ERL_NIF_TERM nif_epqueue_peek(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); | |||||
#endif // C_SRC_EPQUEUE_H_ |
@ -0,0 +1,36 @@ | |||||
#include "epqueue_item.h" | |||||
#include "epqueue_nif.h" | |||||
#include "macros.h" | |||||
bool epqueue_item_less(void* ax, void* bx) | |||||
{ | |||||
queue_item* a = static_cast<queue_item*>(ax); | |||||
queue_item* b = static_cast<queue_item*>(bx); | |||||
return a->priority < b->priority; | |||||
} | |||||
void epqueue_item_update_pos(void* ax, int32_t pos) | |||||
{ | |||||
queue_item* a = static_cast<queue_item*>(ax); | |||||
a->heap_index = pos; | |||||
} | |||||
void epqueue_item_free(ErlNifEnv* env, void* obj) | |||||
{ | |||||
UNUSED(env); | |||||
queue_item* item = static_cast<queue_item*>(obj); | |||||
enif_release_binary(&item->data); | |||||
} | |||||
queue_item* epqueue_item_new(const epqueue_data* data, const ErlNifBinary& bin, uint64_t priority) | |||||
{ | |||||
queue_item* item = static_cast<queue_item*>(enif_alloc_resource(data->resPQueueItem, sizeof(queue_item))); | |||||
if(item == NULL) | |||||
return NULL; | |||||
item->heap_index = -1; | |||||
item->priority = priority; | |||||
item->data = bin; | |||||
return item; | |||||
} |
@ -0,0 +1,22 @@ | |||||
#ifndef C_SRC_EPQUEUE_ITEM_H_ | |||||
#define C_SRC_EPQUEUE_ITEM_H_ | |||||
#include "erl_nif.h" | |||||
#include <stdint.h> | |||||
struct epqueue_data; | |||||
struct queue_item | |||||
{ | |||||
int32_t heap_index; | |||||
uint64_t priority; | |||||
ErlNifBinary data; | |||||
}; | |||||
void epqueue_item_update_pos(void* ax, int32_t pos); | |||||
bool epqueue_item_less(void* ax, void* bx); | |||||
queue_item* epqueue_item_new(const epqueue_data* data, const ErlNifBinary& bin, uint64_t priority); | |||||
void epqueue_item_free(ErlNifEnv* env, void* obj); | |||||
#endif // C_SRC_EPQUEUE_ITEM_H_ |
@ -0,0 +1,74 @@ | |||||
#include <string.h> /* memcmp,strlen */ | |||||
#include "epqueue_nif.h" | |||||
#include "epqueue.h" | |||||
#include "epqueue_item.h" | |||||
#include "nif_utils.h" | |||||
#include "macros.h" | |||||
const char kAtomOk[] = "ok"; | |||||
const char kAtomError[] = "error"; | |||||
const char kAtomTrue[] = "true"; | |||||
const char kAtomFalse[] = "false"; | |||||
const char kAtomUndefined[] = "undefined"; | |||||
const char kAtomGlobalLock[] = "global_lock"; | |||||
atoms ATOMS; | |||||
void open_resources(ErlNifEnv* env, epqueue_data* data) | |||||
{ | |||||
ErlNifResourceFlags flags = static_cast<ErlNifResourceFlags>(ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); | |||||
data->resPQueueInstance = enif_open_resource_type(env, NULL, "pqueue_instance", nif_epqueue_free, flags, NULL); | |||||
data->resPQueueItem = enif_open_resource_type(env, NULL, "pqueue_item", epqueue_item_free, flags, NULL); | |||||
} | |||||
int on_nif_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) | |||||
{ | |||||
UNUSED(load_info); | |||||
ATOMS.atomOk = make_atom(env, kAtomOk); | |||||
ATOMS.atomError = make_atom(env, kAtomError); | |||||
ATOMS.atomTrue = make_atom(env, kAtomTrue); | |||||
ATOMS.atomFalse = make_atom(env, kAtomFalse); | |||||
ATOMS.atomUndefined = make_atom(env, kAtomUndefined); | |||||
ATOMS.atomGlobalLock = make_atom(env, kAtomGlobalLock); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_alloc(sizeof(epqueue_data))); | |||||
open_resources(env, data); | |||||
*priv_data = data; | |||||
return 0; | |||||
} | |||||
void on_nif_unload(ErlNifEnv* env, void* priv_data) | |||||
{ | |||||
UNUSED(env); | |||||
epqueue_data* data = static_cast<epqueue_data*>(priv_data); | |||||
enif_free(data); | |||||
} | |||||
int on_nif_upgrade(ErlNifEnv* env, void** priv, void** old_priv, ERL_NIF_TERM info) | |||||
{ | |||||
UNUSED(old_priv); | |||||
UNUSED(info); | |||||
epqueue_data* data = static_cast<epqueue_data*>(enif_alloc(sizeof(epqueue_data))); | |||||
open_resources(env, data); | |||||
*priv = data; | |||||
return 0; | |||||
} | |||||
static ErlNifFunc nif_funcs[] = | |||||
{ | |||||
{"new", 1, nif_epqueue_new}, | |||||
{"insert", 3, nif_epqueue_insert}, | |||||
{"remove", 2, nif_epqueue_remove}, | |||||
{"pop", 1, nif_epqueue_pop}, | |||||
{"peek", 1, nif_epqueue_peek}, | |||||
{"size", 1, nif_epqueue_size} | |||||
}; | |||||
ERL_NIF_INIT(epqueue_nif, nif_funcs, on_nif_load, NULL, on_nif_upgrade, on_nif_unload) | |||||
@ -0,0 +1,25 @@ | |||||
#ifndef C_SRC_EPQUEUE_NIF_H_ | |||||
#define C_SRC_EPQUEUE_NIF_H_ | |||||
#include "erl_nif.h" | |||||
struct atoms | |||||
{ | |||||
ERL_NIF_TERM atomOk; | |||||
ERL_NIF_TERM atomError; | |||||
ERL_NIF_TERM atomTrue; | |||||
ERL_NIF_TERM atomFalse; | |||||
ERL_NIF_TERM atomUndefined; | |||||
ERL_NIF_TERM atomGlobalLock; | |||||
}; | |||||
struct epqueue_data | |||||
{ | |||||
ErlNifResourceType* resPQueueInstance; | |||||
ErlNifResourceType* resPQueueItem; | |||||
}; | |||||
extern atoms ATOMS; | |||||
#endif // C_SRC_EPQUEUE_NIF_H_ |
@ -0,0 +1,9 @@ | |||||
#ifndef C_SRC_MACROS_H_ | |||||
#define C_SRC_MACROS_H_ | |||||
#define UNUSED(expr) do { (void)(expr); } while (0) | |||||
#define DISALLOW_ASSIGN(TypeName) void operator=(const TypeName&) | |||||
#define DISALLOW_COPY_AND_ASSIGN(TypeName) TypeName(const TypeName&); DISALLOW_ASSIGN(TypeName) | |||||
#endif // C_SRC_MACROS_H_ |
@ -0,0 +1,27 @@ | |||||
#include "nif_utils.h" | |||||
#include "epqueue_nif.h" | |||||
#include <string.h> | |||||
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name) | |||||
{ | |||||
ERL_NIF_TERM ret; | |||||
if(enif_make_existing_atom(env, name, &ret, ERL_NIF_LATIN1)) | |||||
return ret; | |||||
return enif_make_atom(env, name); | |||||
} | |||||
ERL_NIF_TERM make_binary(ErlNifEnv* env, const char* buff, size_t length) | |||||
{ | |||||
ERL_NIF_TERM term; | |||||
unsigned char *destination_buffer = enif_make_new_binary(env, length, &term); | |||||
memcpy(destination_buffer, buff, length); | |||||
return term; | |||||
} | |||||
ERL_NIF_TERM make_error(ErlNifEnv* env, const char* error) | |||||
{ | |||||
return enif_make_tuple2(env, ATOMS.atomError, make_binary(env, error, strlen(error))); | |||||
} |
@ -0,0 +1,10 @@ | |||||
#ifndef C_SRC_NIF_UTILS_H_ | |||||
#define C_SRC_NIF_UTILS_H_ | |||||
#include "erl_nif.h" | |||||
ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name); | |||||
ERL_NIF_TERM make_error(ErlNifEnv* env, const char* error); | |||||
ERL_NIF_TERM make_binary(ErlNifEnv* env, const char* buff, size_t length); | |||||
#endif // C_SRC_NIF_UTILS_H_ |
@ -0,0 +1,148 @@ | |||||
#include "priority_queue.h" | |||||
#include "erl_nif.h" | |||||
#include <stdlib.h> | |||||
#include <string.h> | |||||
// Algorithm described at : http://robin-thomas.github.io/min-heap/ | |||||
#define left(x) 2 * x + 1 | |||||
#define right(x) 2 * x + 2 | |||||
#define parent(x) (x - 1) / 2 | |||||
#define less(a, b) less_(heap_[a], heap_[b]) | |||||
PriorityQueue::PriorityQueue(LessFun ls, UpdatePositionFun upd, DestroyElementFun dtor) : | |||||
capacity_(0), | |||||
length_(0), | |||||
heap_(NULL), | |||||
less_(ls), | |||||
update_pos_fun_(upd), | |||||
item_dtor_(dtor) { } | |||||
PriorityQueue::~PriorityQueue() | |||||
{ | |||||
if(heap_) | |||||
{ | |||||
for(int i = 0; i < length_; i++) | |||||
item_dtor_(heap_[i]); | |||||
enif_free(heap_); | |||||
} | |||||
} | |||||
bool PriorityQueue::insert(void* item) | |||||
{ | |||||
int pos; | |||||
if (length_ == capacity_) | |||||
{ | |||||
int new_capacity = (length_+1) * 2; | |||||
void** new_heap = reinterpret_cast<void**>(enif_alloc(sizeof(void*) * new_capacity)); | |||||
if (!new_heap) | |||||
return false; | |||||
memcpy(new_heap, heap_, sizeof(void*)*length_); | |||||
enif_free(heap_); | |||||
heap_ = new_heap; | |||||
capacity_ = new_capacity; | |||||
} | |||||
pos = (length_)++; | |||||
set(pos, item); | |||||
bubble_down(pos); | |||||
return true; | |||||
} | |||||
bool PriorityQueue::remove(void* item, int pos) | |||||
{ | |||||
if (pos >= length_) | |||||
return false; | |||||
if(heap_[pos] != item) | |||||
return false; | |||||
return remove(pos) == item; | |||||
} | |||||
void* PriorityQueue::remove(int pos) | |||||
{ | |||||
if (pos >= length_) | |||||
return NULL; | |||||
void* item = heap_[pos]; | |||||
length_--; | |||||
int ls = less(pos, length_); | |||||
set(pos, heap_[length_]); | |||||
if(ls) | |||||
bubble_up(pos); | |||||
else | |||||
bubble_down(pos); | |||||
// todo: resize down the heap in case we have a lot of empty slots | |||||
update_pos_fun_(item, -1); | |||||
return item; | |||||
} | |||||
void* PriorityQueue::peek() | |||||
{ | |||||
if(length_ == 0) | |||||
return NULL; | |||||
return heap_[0]; | |||||
} | |||||
// private | |||||
void PriorityQueue::set(int pos, void* item) | |||||
{ | |||||
heap_[pos] = item; | |||||
update_pos_fun_(item, pos); | |||||
} | |||||
void PriorityQueue::pos_swap(int pos1, int pos2) | |||||
{ | |||||
void* tmp = heap_[pos1]; | |||||
set(pos1, heap_[pos2]); | |||||
set(pos2, tmp); | |||||
} | |||||
void PriorityQueue::bubble_down(int pos) | |||||
{ | |||||
while(true) | |||||
{ | |||||
int parent = parent(pos); | |||||
if (pos == 0 || less(parent, pos)) | |||||
return; | |||||
pos_swap(pos, parent); | |||||
pos = parent; | |||||
} | |||||
} | |||||
void PriorityQueue::bubble_up(int pos) | |||||
{ | |||||
while (true) | |||||
{ | |||||
int left = left(pos); | |||||
int right = right(pos); | |||||
int smallest = pos; | |||||
if (left < length_ && less(left, smallest)) | |||||
smallest = left; | |||||
if (right < length_ && less(right, smallest)) | |||||
smallest = right; | |||||
if (smallest == pos) | |||||
return; | |||||
pos_swap(pos, smallest); | |||||
pos = smallest; | |||||
} | |||||
} |
@ -0,0 +1,41 @@ | |||||
#ifndef C_SRC_PRIORITY_QUEUE_H_ | |||||
#define C_SRC_PRIORITY_QUEUE_H_ | |||||
#include "macros.h" | |||||
typedef bool(*LessFun)(void*, void*); | |||||
typedef void(*UpdatePositionFun)(void*, int); | |||||
typedef void(*DestroyElementFun)(void*); | |||||
class PriorityQueue | |||||
{ | |||||
public: | |||||
PriorityQueue(LessFun ls, UpdatePositionFun upd, DestroyElementFun dtor); | |||||
~PriorityQueue(); | |||||
bool insert(void* item); | |||||
bool remove(void* item, int pos); | |||||
void* remove(int pos); | |||||
void* pop() {return remove(0);} | |||||
void* peek(); | |||||
int size() const { return length_;} | |||||
private: | |||||
inline void set(int pos, void* item); | |||||
inline void pos_swap(int pos1, int pos2); | |||||
void bubble_down(int pos); | |||||
void bubble_up(int pos); | |||||
int capacity_; | |||||
int length_; | |||||
void** heap_; | |||||
LessFun less_; | |||||
UpdatePositionFun update_pos_fun_; | |||||
DestroyElementFun item_dtor_; | |||||
DISALLOW_COPY_AND_ASSIGN(PriorityQueue); | |||||
}; | |||||
#endif // C_SRC_PRIORITY_QUEUE_H_ |
@ -0,0 +1,7 @@ | |||||
{port_specs, [ | |||||
{"../../priv/epqueue_nif.so", ["*.cc"]} | |||||
]}. | |||||
@ -0,0 +1,68 @@ | |||||
-module(benchmark). | |||||
-export([ | |||||
benchmark_serial/3, | |||||
benchmark_concurrent/3 | |||||
]). | |||||
benchmark_serial(Elements, MaxPriority, Lock) -> | |||||
rand:uniform(), %just to init the seed | |||||
{ok, Q} = epqueue:new([{global_lock, Lock}]), | |||||
{T0, ok} = timer:tc(fun() -> insert_none(Elements, MaxPriority) end), | |||||
{T1, ok} = timer:tc(fun() -> insert_item(Elements, Q, MaxPriority) end), | |||||
{T2, ok} = timer:tc(fun() -> remove_item(Q) end), | |||||
T0Ms = T0/1000, | |||||
T1Ms = T1/1000, | |||||
T2Ms = T2/1000, | |||||
io:format(<<"insert overhead: ~p ms insert time: ~p ms pop time: ~p ms ~n">>, [T0Ms, T1Ms, T2Ms]). | |||||
benchmark_concurrent(Procs, Elements, MaxPriority) -> | |||||
{ok, Q} = epqueue:new([{global_lock, true}]), | |||||
ElsPerProcess = round(Elements/Procs), | |||||
InsertNoneWorkFun = fun() -> | |||||
insert_none(ElsPerProcess, MaxPriority) | |||||
end, | |||||
InsertWorkFun = fun() -> | |||||
insert_item(ElsPerProcess, Q, MaxPriority) | |||||
end, | |||||
RemoveWorkFun = fun() -> | |||||
remove_item(Q) | |||||
end, | |||||
{T0, _} = timer:tc(fun()-> multi_spawn:do_work(InsertNoneWorkFun, Procs) end), | |||||
{T1, _} = timer:tc(fun()-> multi_spawn:do_work(InsertWorkFun, Procs) end), | |||||
{T2, _} = timer:tc(fun()-> multi_spawn:do_work(RemoveWorkFun, Procs) end), | |||||
T0Ms = T0/1000, | |||||
T1Ms = T1/1000, | |||||
T2Ms = T2/1000, | |||||
io:format(<<"insert overhead: ~p ms insert time: ~p ms pop time: ~p ms ~n">>, [T0Ms, T1Ms, T2Ms]). | |||||
insert_item(0, _Q, _Max) -> | |||||
ok; | |||||
insert_item(N, Q, Max) -> | |||||
El = rand:uniform(Max), | |||||
{ok, _} = epqueue:insert(Q, El, El), | |||||
insert_item(N-1, Q, Max). | |||||
remove_item(Q) -> | |||||
case epqueue:pop(Q) of | |||||
undefined-> | |||||
ok; | |||||
{ok, _, _} -> | |||||
remove_item(Q) | |||||
end. | |||||
insert_none(0, _Max) -> | |||||
ok; | |||||
insert_none(N, Max) -> | |||||
rand:uniform(Max), | |||||
insert_none(N-1, Max). |
@ -0,0 +1,24 @@ | |||||
-module(multi_spawn). | |||||
-export([ | |||||
do_work/2 | |||||
]). | |||||
do_work(Fun, Count) -> | |||||
process_flag(trap_exit, true), | |||||
spawn_childrens(Fun, Count), | |||||
wait_responses(Count). | |||||
spawn_childrens(_Fun, 0) -> | |||||
ok; | |||||
spawn_childrens(Fun, Count) -> | |||||
spawn_link(Fun), | |||||
spawn_childrens(Fun, Count -1). | |||||
wait_responses(0) -> | |||||
ok; | |||||
wait_responses(Count) -> | |||||
receive | |||||
{'EXIT',_FromPid, _Reason} -> | |||||
wait_responses(Count -1) | |||||
end. |
@ -0,0 +1,29 @@ | |||||
{application, epqueue, [ | |||||
{description, "Erlang Priority Queue"}, | |||||
{licenses, ["MIT"]}, | |||||
{links,[{"Github","https://github.com/silviucpp/epqueue"}]}, | |||||
{vsn, "1.2.1"}, | |||||
{registered, []}, | |||||
{applications, [ | |||||
kernel, | |||||
stdlib | |||||
]}, | |||||
{env, []}, | |||||
{files, [ | |||||
"LICENSE*", | |||||
"*.MD", | |||||
"Makefile", | |||||
"rebar.config", | |||||
"rebar.lock", | |||||
"src/*.erl", | |||||
"src/*.src", | |||||
"c_src/*.h", | |||||
"c_src/*.cc", | |||||
"c_src/Makefile", | |||||
"c_src/nif.mk", | |||||
"test/*.erl", | |||||
"test/cover.spec", | |||||
"benchmarks/*.erl" | |||||
]} | |||||
]}. | |||||
@ -0,0 +1,60 @@ | |||||
-module(epqueue). | |||||
-export([ | |||||
new/0, | |||||
new/1, | |||||
size/1, | |||||
insert/3, | |||||
remove/2, | |||||
pop/1, | |||||
peek/1 | |||||
]). | |||||
-type error() :: badarg | {error, binary()}. | |||||
-type queue_option() :: {global_lock, boolean()}. | |||||
-type priority() :: non_neg_integer(). | |||||
-type data() :: any(). | |||||
-type queue_ref() :: reference(). | |||||
-type data_ref() :: reference(). | |||||
-spec new() -> | |||||
{ok, queue_ref()} | error(). | |||||
new() -> | |||||
epqueue_nif:new([]). | |||||
-spec new([queue_option()]) -> | |||||
{ok, queue_ref()} | error(). | |||||
new(Options) -> | |||||
epqueue_nif:new(Options). | |||||
-spec size(queue_ref()) -> | |||||
non_neg_integer() | badarg. | |||||
size(QueueRef) -> | |||||
epqueue_nif:size(QueueRef). | |||||
-spec insert(queue_ref(), data(), priority()) -> | |||||
{ok, data_ref()} | error(). | |||||
insert(QueueRef, Data, Priority) -> | |||||
epqueue_nif:insert(QueueRef, Data, Priority). | |||||
-spec remove(queue_ref(), data_ref()) -> | |||||
boolean() | error(). | |||||
remove(QueueRef, Ref) -> | |||||
epqueue_nif:remove(QueueRef, Ref). | |||||
-spec pop(queue_ref()) -> | |||||
{ok, data(), priority()} | error(). | |||||
pop(QueueRef) -> | |||||
epqueue_nif:pop(QueueRef). | |||||
-spec peek(queue_ref()) -> | |||||
{ok, data(), priority()} | error(). | |||||
peek(QueueRef) -> | |||||
epqueue_nif:peek(QueueRef). |
@ -0,0 +1,51 @@ | |||||
-module(epqueue_nif). | |||||
-define(NOT_LOADED, not_loaded(?LINE)). | |||||
-on_load(load_nif/0). | |||||
-export([ | |||||
new/1, | |||||
size/1, | |||||
insert/3, | |||||
remove/2, | |||||
pop/1, | |||||
peek/1 | |||||
]). | |||||
load_nif() -> | |||||
SoName = get_priv_path(?MODULE), | |||||
io:format(<<"Loading library: ~p ~n">>, [SoName]), | |||||
ok = erlang:load_nif(SoName, 0). | |||||
new(_Options) -> | |||||
?NOT_LOADED. | |||||
size(_QueueRef) -> | |||||
?NOT_LOADED. | |||||
insert(_QueueRef, _Data, _Priority) -> | |||||
?NOT_LOADED. | |||||
remove(_QueueRef, _Ref) -> | |||||
?NOT_LOADED. | |||||
pop(_QueueRef) -> | |||||
?NOT_LOADED. | |||||
peek(_QueueRef) -> | |||||
?NOT_LOADED. | |||||
% internals | |||||
get_priv_path(File) -> | |||||
case code:priv_dir(epqueue) of | |||||
{error, bad_name} -> | |||||
Ebin = filename:dirname(code:which(?MODULE)), | |||||
filename:join([filename:dirname(Ebin), "priv", File]); | |||||
Dir -> | |||||
filename:join(Dir, File) | |||||
end. | |||||
not_loaded(Line) -> | |||||
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}). |
@ -0,0 +1,8 @@ | |||||
{level, details}. | |||||
{incl_dirs, [ | |||||
"../_build/test/lib/epqueue/ebin" | |||||
]}. | |||||
{excl_mods, [ | |||||
]}. |
@ -0,0 +1,69 @@ | |||||
-module(integrity_test_SUITE). | |||||
-compile(export_all). | |||||
all() -> [ | |||||
{group, epqueue_group} | |||||
]. | |||||
groups() -> [ | |||||
{epqueue_group, [sequence], [ | |||||
basic_ops, | |||||
empty_queue, | |||||
non_empty_queue, | |||||
test_remove, | |||||
test_pop | |||||
]} | |||||
]. | |||||
init_per_suite(Config) -> | |||||
Config. | |||||
end_per_suite(_Config) -> | |||||
ok. | |||||
basic_ops(_Config) -> | |||||
{ok, Q} = epqueue:new([]), | |||||
{ok, _Ref} = epqueue:insert(Q, 1, 1), | |||||
{ok, 1, 1} = epqueue:peek(Q), | |||||
1 = epqueue:size(Q), | |||||
{ok, 1, 1} = epqueue:pop(Q), | |||||
0 = epqueue:size(Q), | |||||
ok. | |||||
empty_queue(_Config) -> | |||||
{ok, _} = epqueue:new([]), | |||||
ok. | |||||
non_empty_queue(_Config) -> | |||||
{ok, Q1} = epqueue:new([]), | |||||
{ok, _} = epqueue:insert(Q1, 1, 1), | |||||
{ok, _} = epqueue:insert(Q1, 2, 2), | |||||
{ok, _} = epqueue:insert(Q1, 3, 3), | |||||
{ok, _} = epqueue:insert(Q1, 4, 4), | |||||
{ok, _} = epqueue:insert(Q1, 5, 5), | |||||
ok. | |||||
test_remove(_Config) -> | |||||
{ok, Q1} = epqueue:new([]), | |||||
{ok, Q2} = epqueue:new([]), | |||||
{ok, Ref7} = epqueue:insert(Q2, 7, 7), | |||||
{ok, Ref3} = epqueue:insert(Q1, 3, 3), | |||||
{ok, Ref5} = epqueue:insert(Q1, 5, 5), | |||||
{ok, Ref1} = epqueue:insert(Q1, 1, 1), | |||||
false = epqueue:remove(Q1, Ref7), | |||||
true = epqueue:remove(Q1, Ref5), | |||||
true = epqueue:remove(Q1, Ref3), | |||||
true = epqueue:remove(Q1, Ref1), | |||||
true = epqueue:remove(Q2, Ref7), | |||||
ok. | |||||
test_pop(_Config) -> | |||||
{ok, Q1} = epqueue:new([]), | |||||
epqueue:insert(Q1, 3, 3), | |||||
epqueue:insert(Q1, 5, 5), | |||||
epqueue:insert(Q1, 1, 1), | |||||
{ok, 1, 1} = epqueue:pop(Q1), | |||||
{ok, 3, 3} = epqueue:pop(Q1), | |||||
{ok, 5, 5} = epqueue:pop(Q1), | |||||
ok. |