瀏覽代碼

ft: 基础的队列-线程池模型

master
lijie 2 年之前
父節點
當前提交
1f99f4bddd
共有 3 個檔案被更改,包括 92 行新增4 行删除
  1. +92
    -4
      c_src/HJpsNif/eHJps.cc
  2. +0
    -0
      c_src/HJpsNif/worker.cc
  3. 二進制
      priv/eHJps.so

+ 92
- 4
c_src/HJpsNif/eHJps.cc 查看文件

@ -1,10 +1,15 @@
#include<stdio.h>
#include<stdlib.h>
#include<stdbool.h>
#include<stdatomic.h>
#include<atomic>
#include<vector>
#include<erl_nif.h>
#include "eHJpsAtom.h"
#include "blockingconcurrentqueue.h"
static void *UpdateMapFun(void *obj);
static void *FindPathFun(void *obj);
struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits {
static const size_t BLOCK_SIZE = 16;
@ -15,11 +20,23 @@ struct NifTraits : public moodycamel::ConcurrentQueueDefaultTraits {
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
static inline void *malloc(std::size_t size) { return enif_alloc(size); }
static inline void free(void *ptr) { enif_free(ptr); }
};
using lfqIns = moodycamel::ConcurrentQueue<ErlNifBinary, NifTraits> *;
typedef struct UpdateMapData_r {
bool IsBarrier;
} UpdateMapData;
typedef struct FindPathData_r {
bool IsBarrier;
} FindPathData;
using LfqUpdateMap = moodycamel::BlockingConcurrentQueue<UpdateMapData, NifTraits> *;
using LfqFindPath = moodycamel::BlockingConcurrentQueue<FindPathData, NifTraits> *;
LfqUpdateMap QUpdateMap;
LfqFindPath QFindPath;
const size_t BulkDelCnt = 200;
@ -34,6 +51,24 @@ int nifLoad(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) {
enif_fprintf(stdout, "IMY*************nifload00001\n");
*priv_data = NULL;
QUpdateMap = new moodycamel::BlockingConcurrentQueue<UpdateMapData, NifTraits>;
QFindPath = new moodycamel::BlockingConcurrentQueue<FindPathData, NifTraits>;
ErlNifTid Tid;
if (0 != enif_thread_create((char *)"ThreadUpdateMap", &Tid, UpdateMapFun, NULL, NULL)) {
return -1;
}
if (0 != enif_thread_create((char *)"FindPathFun", &Tid, FindPathFun, NULL, NULL)) {
return -1;
}
if (0 != enif_thread_create((char *)"FindPathFun", &Tid, FindPathFun, NULL, NULL)) {
return -1;
}
if (0 != enif_thread_create((char *)"FindPathFun", &Tid, FindPathFun, NULL, NULL)) {
return -1;
}
return 0;
}
@ -49,7 +84,60 @@ void nifUnload(ErlNifEnv* env, void* priv_data) {
return;
}
ERL_NIF_TERM nifInitMap(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
return atom_ok;
}
ERL_NIF_TERM nifUpdateMap(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
UpdateMapData InsertData = {true};
if (QUpdateMap->enqueue(InsertData)) {
return atom_true;
} else {
return atom_false;
}
}
ERL_NIF_TERM nifFindPath(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) {
FindPathData InsertData = {true};
if (QFindPath->enqueue(InsertData)) {
return atom_true;
} else {
return atom_false;
}
}
static ErlNifFunc nifFuns[] = {
{"initMap", 1, nifInitMap},
{"updateMap", 3, nifUpdateMap},
{"findPath", 5, nifFindPath}
};
ERL_NIF_INIT(eQuic, nifFuns, nifLoad, NULL, nifUpgrade, nifUnload)
ERL_NIF_INIT(eHJps, nifFuns, nifLoad, NULL, nifUpgrade, nifUnload)
static void *UpdateMapFun(void *obj) {
return (void *) 0; // 线程处理结束,退出
}
static void *FindPathFun(void *obj) {
FindPathData Data;
ErlNifTid Tid = enif_thread_self();
while (true)
{
QFindPath->wait_dequeue(Data);
enif_fprintf(stdout, "IMY*************listenerResClose5 %T %d\n", Tid, Data.IsBarrier);
}
return (void *) 0; // 线程处理结束,退出
}
void eLfqFree(ErlNifEnv *, void *obj) {
std::vector <FindPathData> DataList(BulkDelCnt);
size_t OutSize;
do{
OutSize = QFindPath->try_dequeue_bulk(DataList.begin(), DataList.size());
}while(OutSize >= BulkDelCnt);
}

+ 0
- 0
c_src/HJpsNif/worker.cc 查看文件


二進制
priv/eHJps.so 查看文件


Loading…
取消
儲存