From 214c2d25702f5f1706b229894a735e205482d90e Mon Sep 17 00:00:00 2001 From: SisMaker <1713699517@qq.com> Date: Mon, 29 Apr 2024 01:26:38 +0800 Subject: [PATCH] =?UTF-8?q?ft:=20=E4=BC=98=E5=8C=96nif=E7=9A=84=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20=E6=89=B9=E9=87=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- c_src/eNifLock/eNifLock.c | 83 ---------- c_src/eNifLock/eNifLock.cc | 314 ++++++++++++++++++++++++++++++++++++ c_src/eNifLock/rebar.config | 2 +- include/eGLock.hrl | 7 +- src/eCLock.erl | 112 +++---------- src/eGLock.erl | 4 +- src/eNifLock.erl | 20 ++- 7 files changed, 359 insertions(+), 183 deletions(-) delete mode 100644 c_src/eNifLock/eNifLock.c create mode 100644 c_src/eNifLock/eNifLock.cc diff --git a/c_src/eNifLock/eNifLock.c b/c_src/eNifLock/eNifLock.c deleted file mode 100644 index de63ada..0000000 --- a/c_src/eNifLock/eNifLock.c +++ /dev/null @@ -1,83 +0,0 @@ -#include "erl_nif.h" -#include - -atomic_ullong LockSlot[1048576]; - -ERL_NIF_TERM atomTrue; -ERL_NIF_TERM atomFalse; -ERL_NIF_TERM atomUndefined; - -int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM){ - atomTrue = enif_make_atom(env, "true"); - atomFalse = enif_make_atom(env, "false"); - atomUndefined = enif_make_atom(env, "undefined"); - return 0; -} - -ERL_NIF_TERM tryLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]){ - unsigned int KeyIx; - if (!enif_get_uint(env, argv[0], &KeyIx)){ - return enif_make_badarg(env); - } - ErlNifPid ThePid; - enif_self(env, &ThePid); - atomic_ullong Expected = 0; - atomic_ullong Val = (atomic_ullong)(ThePid.pid); - - if (atomic_compare_exchange_strong(&(LockSlot[KeyIx]), &Expected, Val)){ - return atomTrue; - }else{ - ThePid.pid = (ERL_NIF_TERM)Expected; - if (enif_is_process_alive(env, &ThePid)){ - return atomFalse; - }else{ - if (atomic_compare_exchange_strong(&LockSlot[KeyIx], &Expected, Val)) { - return atomTrue; - }else{ - return atomFalse; - } - } - } -} - -ERL_NIF_TERM releaseLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]){ - unsigned int KeyIx; - if (!enif_get_uint(env, argv[0], &KeyIx)){ - return enif_make_badarg(env); - } - ErlNifPid ThePid; - enif_self(env, &ThePid); - atomic_ullong Expected = (atomic_ullong)(ThePid.pid); - atomic_ullong Val = 0; - - if (atomic_compare_exchange_strong(&LockSlot[KeyIx], &Expected, Val)){ - return atomTrue; - }else{ - return atomFalse; - } -} - -ERL_NIF_TERM getLockPid(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]){ - unsigned int KeyIx; - if (!enif_get_uint(env, argv[0], &KeyIx)){ - return enif_make_badarg(env); - } - ErlNifPid ThePid; - - atomic_ullong Var = atomic_load(&(LockSlot[KeyIx])); - if (Var > 0){ - ThePid.pid = (ERL_NIF_TERM)Var; - return enif_make_pid(env, &ThePid); - }else{ - return atomUndefined; - } -} - -static ErlNifFunc nifFuncs[] = - { - {"tryLock", 1, tryLock}, - {"releaseLock", 1, releaseLock}, - {"getLockPid", 1, getLockPid} -}; - -ERL_NIF_INIT(eNifLock, nifFuncs, &nifLoad, NULL, NULL, NULL) \ No newline at end of file diff --git a/c_src/eNifLock/eNifLock.cc b/c_src/eNifLock/eNifLock.cc new file mode 100644 index 0000000..af4da65 --- /dev/null +++ b/c_src/eNifLock/eNifLock.cc @@ -0,0 +1,314 @@ +#include "erl_nif.h" +#include +#include +#include +#include +#include +#include +using namespace std; + +const int LockSize = 2097152; +const int ScheduleCnt = 50; +const int HashSalt = 786234121; +atomic LockSlot[LockSize]; + +ERL_NIF_TERM atomTrue; +ERL_NIF_TERM atomFalse; +ERL_NIF_TERM atomUndefined; + +int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM) +{ + atomTrue = enif_make_atom(env, "true"); + atomFalse = enif_make_atom(env, "false"); + atomUndefined = enif_make_atom(env, "undefined"); + return 0; +} + +bool isNotCurLocked(forward_list *locked, int KeyIx) +{ + for (auto it : *locked) + { + if (it == KeyIx) + return false; + } + return true; +} + +bool lockOne(ErlNifEnv *env, ErlNifPid *ThePid, int KeyIx, uint64_t Val) +{ + uint64_t Expected = 0; + if (LockSlot[KeyIx].compare_exchange_strong(Expected, Val)) + { + return true; + } + else + { + ThePid->pid = (ERL_NIF_TERM)Expected; + if (enif_is_process_alive(env, ThePid)) + { + return false; + } + else + { + if (LockSlot[KeyIx].compare_exchange_strong(Expected, Val)) + { + return true; + } + else + { + return false; + } + } + } +} + +ERL_NIF_TERM tryLockSchedule(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) +{ + //enif_fprintf(stdout, "IMY************tryLocksSchedule00 %T \n", argv[0]); + set locked; + ERL_NIF_TERM allList = argv[1]; + ERL_NIF_TERM head; + int KeyIx; + while (enif_get_list_cell(env, allList, &head, &allList)) + { + enif_get_int(env, head, &KeyIx); + locked.insert(KeyIx); + } + + ErlNifPid ThePid; + enif_self(env, &ThePid); + uint64_t Val = (uint64_t)(ThePid.pid); + allList = argv[0]; + //enif_fprintf(stdout, "IMY************tryLocksSchedule222 %T \n", allList); + auto search = locked.end(); + while (enif_get_list_cell(env, allList, &head, &allList)) + { + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize; + //enif_fprintf(stdout, "IMY************tryLocksSchedule222000 %d \n", KeyIx); + search = locked.find(KeyIx); + if (search != locked.end()) + { + if (!lockOne(env, &ThePid, KeyIx, Val)) + { + goto Failed; + } + else + { + locked.insert(KeyIx); + } + } + } + //enif_fprintf(stdout, "IMY************tryLocksSchedule333 %T \n", allList); + return atomTrue; + +Failed: +{ + uint64_t RExpected; + for (auto it : locked) + { + RExpected = Val; + LockSlot[it].compare_exchange_strong(RExpected, 0); + } + return atomFalse; +} +} + +ERL_NIF_TERM tryLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) +{ + if (enif_is_list(env, argv[0])) + { + // enif_fprintf(stdout, "IMY************tryLocks00 %T \n", argv[0]); + ERL_NIF_TERM allList = argv[0]; + ERL_NIF_TERM head; + ErlNifPid ThePid; + enif_self(env, &ThePid); + uint64_t Val = (uint64_t)(ThePid.pid); + int KeyIx; + //forward_list locked(15); + //list locked; + set locked; + // map locked; + unsigned int cnt = 0; + auto search = locked.end(); + // enif_fprintf(stdout, "IMY************tryLocks222 %T \n", allList); + while (enif_get_list_cell(env, allList, &head, &allList)) + { + // enif_fprintf(stdout, "IMY************tryLocks222---- %T %T\n", head, allList); + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize; + // enif_fprintf(stdout, "IMY************tryLocks222000 %d \n", KeyIx); + search = locked.find(KeyIx); + if (search != locked.end()) + //if (isNotCurLocked(&locked, KeyIx)) + { + if (!lockOne(env, &ThePid, KeyIx, Val)) + { + goto Failed; + } + else + { + // enif_fprintf(stdout, "IMY************tryLocks222111 %d \n", KeyIx); + //locked.push_front(KeyIx); + //locked.push_back(KeyIx); + locked.insert(KeyIx); + //locked.insert({KeyIx, 0}); + cnt++; + // enif_fprintf(stdout, "IMY************tryLocks222222 %d \n", cnt); + if (cnt >= ScheduleCnt) + { + goto Schedule; + } + } + } + } + // enif_fprintf(stdout, "IMY************tryLocks333 %T \n", allList); + return atomTrue; + + Schedule: + { + ERL_NIF_TERM ListLocked = enif_make_list(env, 0); + //for (auto it : locked) + // { + // enif_make_list_cell(env, enif_make_int(env, it), ListLocked); + //} + + ERL_NIF_TERM newargv[2] = {allList, ListLocked}; + //enif_fprintf(stdout, "IMY************tryLocks4444 %T \n", newargv); + return enif_schedule_nif(env, "tryLockSchedule", 0, &tryLockSchedule, 2, newargv); + } + + Failed: + { + //uint64_t RExpected; + //for (auto it : locked) + // { +//RExpected = Val; + // LockSlot[it].compare_exchange_strong(RExpected, 0); + // } + return atomFalse; + } +} +else +{ + int KeyIx; + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, argv[0], HashSalt) % LockSize; + ErlNifPid ThePid; + enif_self(env, &ThePid); + uint64_t Val = (uint64_t)(ThePid.pid); + + if (lockOne(env, &ThePid, KeyIx, Val)) + { + return atomTrue; + } + else + { + return atomFalse; + } +} +} + +ERL_NIF_TERM releaseLockSchedule(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM allList = argv[0]; + ERL_NIF_TERM head; + ErlNifPid ThePid; + enif_self(env, &ThePid); + uint64_t Expected = (uint64_t)(ThePid.pid); + uint64_t RExpected; + int KeyIx; + int isAllOk = 1; + enif_get_int(env, argv[1], &isAllOk); + + while (enif_get_list_cell(env, allList, &head, &allList)) + { + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize; + RExpected = Expected; + if (!LockSlot[KeyIx].compare_exchange_strong(RExpected, 0)) + { + isAllOk = 0; + } + } + return isAllOk > 0 ? atomTrue : atomFalse; +} + +ERL_NIF_TERM releaseLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) +{ + if (enif_is_list(env, argv[0])) + { + + ERL_NIF_TERM allList = argv[0]; + ERL_NIF_TERM head; + ErlNifPid ThePid; + enif_self(env, &ThePid); + uint64_t Expected = (uint64_t)(ThePid.pid); + uint64_t RExpected; + int KeyIx; + unsigned int cnt = 0; + int isAllOk = 1; + + while (enif_get_list_cell(env, allList, &head, &allList)) + { + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize; + RExpected = Expected; + if (!LockSlot[KeyIx].compare_exchange_strong(RExpected, 0)) + { + isAllOk = 0; + } + cnt++; + if (cnt >= ScheduleCnt) + { + goto Schedule; + } + } + return isAllOk > 0 ? atomTrue : atomFalse; + + Schedule: + { + ERL_NIF_TERM ArgvAllOk = enif_make_int(env, isAllOk); + ERL_NIF_TERM newargv[2] = {allList, ArgvAllOk}; + //enif_fprintf(stdout, "IMY************tryLocks4444 %T \n", newargv); + return enif_schedule_nif(env, "releaseLockSchedule", ERL_NIF_DIRTY_JOB_CPU_BOUND, &releaseLockSchedule, 2, newargv); + } +} +else +{ + int KeyIx; + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, argv[0], HashSalt) % LockSize; + ErlNifPid ThePid; + enif_self(env, &ThePid); + uint64_t Expected = (uint64_t)(ThePid.pid); + + if (LockSlot[KeyIx].compare_exchange_strong(Expected, 0)) + { + return atomTrue; + } + else + { + return atomFalse; + } +} +} + +ERL_NIF_TERM getLockPid(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]) +{ + int KeyIx; + KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, argv[0], HashSalt) % LockSize; + ErlNifPid ThePid; + + uint64_t Var = LockSlot[KeyIx].load(); + if (Var > 0) + { + ThePid.pid = (ERL_NIF_TERM)Var; + return enif_make_pid(env, &ThePid); + } + else + { + return atomUndefined; + } +} + +static ErlNifFunc nifFuncs[] = { + {"tryLock", 1, tryLock}, + {"releaseLock", 1, releaseLock}, + {"getLockPid", 1, getLockPid}}; + +ERL_NIF_INIT(eNifLock, nifFuncs, &nifLoad, NULL, NULL, NULL) \ No newline at end of file diff --git a/c_src/eNifLock/rebar.config b/c_src/eNifLock/rebar.config index f04da96..e27bd7b 100644 --- a/c_src/eNifLock/rebar.config +++ b/c_src/eNifLock/rebar.config @@ -1,5 +1,5 @@ {port_specs, [ - {"../../priv/eNifLock.so", ["*.c"]} + {"../../priv/eNifLock.so", ["*.cc"]} ]}. diff --git a/include/eGLock.hrl b/include/eGLock.hrl index a6ac64b..400a85b 100644 --- a/include/eGLock.hrl +++ b/include/eGLock.hrl @@ -1,9 +1,12 @@ %% 默认超时时间单位:Ms -define(LockTimeOut, 5000). %% 超时重试时间单位:Ms --define(ReTryTime, 10). +-define(ReTryTime, 3). %% 数组数量 --define(eGLockSize, 1048576). +-define(eGLockBatch, 15). + +%% 数组数量 +-define(eGLockSize, 2097152). %% atomics索引 -define(eGLockRef, eGLockRef). diff --git a/src/eCLock.erl b/src/eCLock.erl index 8ddb17e..6bc9188 100644 --- a/src/eCLock.erl +++ b/src/eCLock.erl @@ -7,6 +7,7 @@ tryLock/1 , tryLock/2 , releaseLock/1 + , getLockPid/1 , lockApply/2 , lockApply/3 ]). @@ -16,74 +17,45 @@ tryLock(KeyOrKeys) -> tryLock(KeyOrKeys, ?LockTimeOut). tryLock(KeyOrKeys, TimeOut) -> - case is_list(KeyOrKeys) of - true -> - KeyIxs = getKexIxs(KeyOrKeys, []), - doTryLocks(KeyIxs, TimeOut); - _ -> - doTryLock(erlang:phash2(KeyOrKeys, ?eGLockSize), TimeOut) - end. + doTryLock(KeyOrKeys, TimeOut). -doTryLock(KeyIx, TimeOut) -> - case eNifLock:tryLock(KeyIx) of +doTryLock(KeyOrKeys, TimeOut) -> + case eNifLock:tryLock(KeyOrKeys) of true -> true; _ -> - loopLock(KeyIx, TimeOut) + loopLock(KeyOrKeys, TimeOut) end. -loopLock(KeyIx, TimeOut) -> +loopLock(KeyOrKeys, TimeOut) -> receive after ?ReTryTime -> LTimeOut = ?CASE(TimeOut == infinity, TimeOut, TimeOut - ?ReTryTime), case LTimeOut >= 0 of true -> - case eNifLock:tryLock(KeyIx) of + case eNifLock:tryLock(KeyOrKeys) of true -> true; _ -> - loopLock(KeyIx, LTimeOut) + loopLock(KeyOrKeys, LTimeOut) end; _ -> ltimeout end end. -doTryLocks(KeyIxs, TimeOut) -> - case tryLockAll(KeyIxs, []) of - true -> - true; - _ -> - loopLocks(KeyIxs, TimeOut) - end. - -loopLocks(KeyIxs, TimeOut) -> - receive - after ?ReTryTime -> - LTimeOut = ?CASE(TimeOut == infinity, TimeOut, TimeOut - ?ReTryTime), - case LTimeOut >= 0 of - true -> - case tryLockAll(KeyIxs, []) of - true -> - true; - _ -> - loopLocks(KeyIxs, LTimeOut) - end; - _ -> - ltimeout - end - end. -spec releaseLock(KeyOrKeys :: term() | [term()]) -> ok. releaseLock(KeyOrKeys) -> + eNifLock:releaseLocks(KeyOrKeys). + +-spec getLockPid(KeyOrKeys :: term() | [term()]) -> ok. +getLockPid(KeyOrKeys) -> case is_list(KeyOrKeys) of true -> - KeyIxs = getKexIxs(KeyOrKeys, []), - [eNifLock:releaseLock(OneKeyIx) || OneKeyIx <- KeyIxs], - ok; + [{OneKey, eNifLock:getLockPid(OneKey)} || OneKey <- KeyOrKeys]; _ -> - eNifLock:releaseLock(erlang:phash2(KeyOrKeys, ?eGLockSize)), - ok + {KeyOrKeys, eNifLock:getLockPid(KeyOrKeys)} end. -spec lockApply(KeyOrKeys :: term() | [term()], MFAOrFun :: {M :: atom(), F :: atom(), Args :: list()} | {Fun :: function(), Args :: list()}) -> term() | {error, ltimeout} | {error, {lock_apply_error, term()}}. @@ -92,55 +64,21 @@ lockApply(KeyOrKeys, MFAOrFun) -> -spec lockApply(KeyOrKeys :: term() | [term()], MFAOrFun :: {M :: atom(), F :: atom(), Args :: list()} | {Fun :: function(), Args :: list()}, TimeOut :: integer() | infinity) -> term(). lockApply(KeyOrKeys, MFAOrFun, TimeOut) -> - case is_list(KeyOrKeys) of + case doTryLock(KeyOrKeys, TimeOut) of true -> - KeyIxs = getKexIxs(KeyOrKeys, []), - case doTryLocks(KeyIxs, TimeOut) of - true -> - try doApply(MFAOrFun) - catch C:R:S -> - {error, {lock_apply_error, {C, R, S}}} - after - [eNifLock:releaseLock(OneKeyIx) || OneKeyIx <- KeyIxs], - ok - end; - ltimeout -> - {error, ltimeout} + try doApply(MFAOrFun) + catch C:R:S -> + {error, {lock_apply_error, {C, R, S}}} + after + eNifLock:releaseLock(KeyOrKeys), + ok end; - _ -> - KeyIx = erlang:phash2(KeyOrKeys, ?eGLockSize), - case doTryLock(KeyIx, TimeOut) of - true -> - try doApply(MFAOrFun) - catch C:R:S -> - {error, {lock_apply_error, {C, R, S}}} - after - eNifLock:releaseLock(KeyIx), - ok - end; - ltimeout -> - {error, ltimeout} - end - end. - -getKexIxs([], IxAcc) -> IxAcc; -getKexIxs([Key | Keys], IxAcc) -> - KeyIx = erlang:phash2(Key, ?eGLockSize), - getKexIxs(Keys, ?CASE(lists:member(KeyIx, IxAcc), IxAcc, [KeyIx | IxAcc])). - -tryLockAll([], _LockAcc) -> - true; -tryLockAll([KeyIx | KeyIxs], LockAcc) -> - case eNifLock:tryLock(KeyIx) of - true -> - tryLockAll(KeyIxs, [KeyIx | LockAcc]); - _ -> - [eNifLock:releaseLock(OneLock) || OneLock <- LockAcc], - false + ltimeout -> + {error, ltimeout} end. -doApply({M, F, A}) -> - apply(M, F, A); +doApply({M, F, Args}) -> + apply(M, F, Args); doApply({Fun, Args}) -> apply(Fun, Args). diff --git a/src/eGLock.erl b/src/eGLock.erl index ec6f5fe..47c9e48 100644 --- a/src/eGLock.erl +++ b/src/eGLock.erl @@ -214,8 +214,8 @@ tryLockAll([KeyIx | KeyIxs], ALockRef, PidInt, LockAcc) -> false end. -doApply({M, F, A}) -> - apply(M, F, A); +doApply({M, F, Args}) -> + apply(M, F, Args); doApply({Fun, Args}) -> apply(Fun, Args). diff --git a/src/eNifLock.erl b/src/eNifLock.erl index 491445d..273d556 100644 --- a/src/eNifLock.erl +++ b/src/eNifLock.erl @@ -1,6 +1,10 @@ -module(eNifLock). --export([tryLock/1, releaseLock/1, getLockPid/1]). +-export([ + tryLock/1 + , releaseLock/1 + , getLockPid/1 +]). -on_load(init/0). @@ -19,14 +23,14 @@ init() -> end, erlang:load_nif(SoName, 0). --spec tryLock(KeyIx :: non_neg_integer()) -> true | false. -tryLock(_KeyIx) -> +-spec tryLock(_KeyOrKeys :: term() | [term()]) -> true | false. +tryLock(_KeyOrKeys) -> erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, ?LINE}]}). --spec releaseLock(KeyIx :: non_neg_integer()) -> true | false. -releaseLock(_KeyIx) -> +-spec releaseLock(_KeyOrKeys :: term() | [term()]) -> true | false. +releaseLock(_KeyOrKeys) -> erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, ?LINE}]}). --spec getLockPid(KeyIx :: non_neg_integer()) -> pid() | undefined. -getLockPid(_KeyIx) -> - erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, ?LINE}]}). \ No newline at end of file +-spec getLockPid(OneKey :: term()) -> pid() | undefined. +getLockPid(_OneKey) -> + erlang:nif_error({not_loaded, [{module, ?MODULE}, {line1, ?LINE}]}).