Quellcode durchsuchen

ft: 优化nif的实现 批量锁

master
SisMaker vor 1 Jahr
Ursprung
Commit
214c2d2570
7 geänderte Dateien mit 359 neuen und 183 gelöschten Zeilen
  1. +0
    -83
      c_src/eNifLock/eNifLock.c
  2. +314
    -0
      c_src/eNifLock/eNifLock.cc
  3. +1
    -1
      c_src/eNifLock/rebar.config
  4. +5
    -2
      include/eGLock.hrl
  5. +25
    -87
      src/eCLock.erl
  6. +2
    -2
      src/eGLock.erl
  7. +12
    -8
      src/eNifLock.erl

+ 0
- 83
c_src/eNifLock/eNifLock.c Datei anzeigen

@ -1,83 +0,0 @@
#include "erl_nif.h"
#include <stdatomic.h>
atomic_ullong LockSlot[1048576];
ERL_NIF_TERM atomTrue;
ERL_NIF_TERM atomFalse;
ERL_NIF_TERM atomUndefined;
int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM){
atomTrue = enif_make_atom(env, "true");
atomFalse = enif_make_atom(env, "false");
atomUndefined = enif_make_atom(env, "undefined");
return 0;
}
ERL_NIF_TERM tryLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]){
unsigned int KeyIx;
if (!enif_get_uint(env, argv[0], &KeyIx)){
return enif_make_badarg(env);
}
ErlNifPid ThePid;
enif_self(env, &ThePid);
atomic_ullong Expected = 0;
atomic_ullong Val = (atomic_ullong)(ThePid.pid);
if (atomic_compare_exchange_strong(&(LockSlot[KeyIx]), &Expected, Val)){
return atomTrue;
}else{
ThePid.pid = (ERL_NIF_TERM)Expected;
if (enif_is_process_alive(env, &ThePid)){
return atomFalse;
}else{
if (atomic_compare_exchange_strong(&LockSlot[KeyIx], &Expected, Val)) {
return atomTrue;
}else{
return atomFalse;
}
}
}
}
ERL_NIF_TERM releaseLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]){
unsigned int KeyIx;
if (!enif_get_uint(env, argv[0], &KeyIx)){
return enif_make_badarg(env);
}
ErlNifPid ThePid;
enif_self(env, &ThePid);
atomic_ullong Expected = (atomic_ullong)(ThePid.pid);
atomic_ullong Val = 0;
if (atomic_compare_exchange_strong(&LockSlot[KeyIx], &Expected, Val)){
return atomTrue;
}else{
return atomFalse;
}
}
ERL_NIF_TERM getLockPid(ErlNifEnv *env, int, const ERL_NIF_TERM argv[]){
unsigned int KeyIx;
if (!enif_get_uint(env, argv[0], &KeyIx)){
return enif_make_badarg(env);
}
ErlNifPid ThePid;
atomic_ullong Var = atomic_load(&(LockSlot[KeyIx]));
if (Var > 0){
ThePid.pid = (ERL_NIF_TERM)Var;
return enif_make_pid(env, &ThePid);
}else{
return atomUndefined;
}
}
static ErlNifFunc nifFuncs[] =
{
{"tryLock", 1, tryLock},
{"releaseLock", 1, releaseLock},
{"getLockPid", 1, getLockPid}
};
ERL_NIF_INIT(eNifLock, nifFuncs, &nifLoad, NULL, NULL, NULL)

+ 314
- 0
c_src/eNifLock/eNifLock.cc Datei anzeigen

@ -0,0 +1,314 @@
#include "erl_nif.h"
#include <atomic>
#include <forward_list>
#include <list>
#include <set>
#include <map>
#include <unordered_set>
using namespace std;
const int LockSize = 2097152;
const int ScheduleCnt = 50;
const int HashSalt = 786234121;
atomic<uint64_t> LockSlot[LockSize];
ERL_NIF_TERM atomTrue;
ERL_NIF_TERM atomFalse;
ERL_NIF_TERM atomUndefined;
int nifLoad(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM)
{
atomTrue = enif_make_atom(env, "true");
atomFalse = enif_make_atom(env, "false");
atomUndefined = enif_make_atom(env, "undefined");
return 0;
}
bool isNotCurLocked(forward_list<int> *locked, int KeyIx)
{
for (auto it : *locked)
{
if (it == KeyIx)
return false;
}
return true;
}
bool lockOne(ErlNifEnv *env, ErlNifPid *ThePid, int KeyIx, uint64_t Val)
{
uint64_t Expected = 0;
if (LockSlot[KeyIx].compare_exchange_strong(Expected, Val))
{
return true;
}
else
{
ThePid->pid = (ERL_NIF_TERM)Expected;
if (enif_is_process_alive(env, ThePid))
{
return false;
}
else
{
if (LockSlot[KeyIx].compare_exchange_strong(Expected, Val))
{
return true;
}
else
{
return false;
}
}
}
}
ERL_NIF_TERM tryLockSchedule(ErlNifEnv *env, int, const ERL_NIF_TERM argv[])
{
//enif_fprintf(stdout, "IMY************tryLocksSchedule00 %T \n", argv[0]);
set<int> locked;
ERL_NIF_TERM allList = argv[1];
ERL_NIF_TERM head;
int KeyIx;
while (enif_get_list_cell(env, allList, &head, &allList))
{
enif_get_int(env, head, &KeyIx);
locked.insert(KeyIx);
}
ErlNifPid ThePid;
enif_self(env, &ThePid);
uint64_t Val = (uint64_t)(ThePid.pid);
allList = argv[0];
//enif_fprintf(stdout, "IMY************tryLocksSchedule222 %T \n", allList);
auto search = locked.end();
while (enif_get_list_cell(env, allList, &head, &allList))
{
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize;
//enif_fprintf(stdout, "IMY************tryLocksSchedule222000 %d \n", KeyIx);
search = locked.find(KeyIx);
if (search != locked.end())
{
if (!lockOne(env, &ThePid, KeyIx, Val))
{
goto Failed;
}
else
{
locked.insert(KeyIx);
}
}
}
//enif_fprintf(stdout, "IMY************tryLocksSchedule333 %T \n", allList);
return atomTrue;
Failed:
{
uint64_t RExpected;
for (auto it : locked)
{
RExpected = Val;
LockSlot[it].compare_exchange_strong(RExpected, 0);
}
return atomFalse;
}
}
ERL_NIF_TERM tryLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[])
{
if (enif_is_list(env, argv[0]))
{
// enif_fprintf(stdout, "IMY************tryLocks00 %T \n", argv[0]);
ERL_NIF_TERM allList = argv[0];
ERL_NIF_TERM head;
ErlNifPid ThePid;
enif_self(env, &ThePid);
uint64_t Val = (uint64_t)(ThePid.pid);
int KeyIx;
//forward_list<int> locked(15);
//list<int> locked;
set<int> locked;
// map<int, int> locked;
unsigned int cnt = 0;
auto search = locked.end();
// enif_fprintf(stdout, "IMY************tryLocks222 %T \n", allList);
while (enif_get_list_cell(env, allList, &head, &allList))
{
// enif_fprintf(stdout, "IMY************tryLocks222---- %T %T\n", head, allList);
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize;
// enif_fprintf(stdout, "IMY************tryLocks222000 %d \n", KeyIx);
search = locked.find(KeyIx);
if (search != locked.end())
//if (isNotCurLocked(&locked, KeyIx))
{
if (!lockOne(env, &ThePid, KeyIx, Val))
{
goto Failed;
}
else
{
// enif_fprintf(stdout, "IMY************tryLocks222111 %d \n", KeyIx);
//locked.push_front(KeyIx);
//locked.push_back(KeyIx);
locked.insert(KeyIx);
//locked.insert({KeyIx, 0});
cnt++;
// enif_fprintf(stdout, "IMY************tryLocks222222 %d \n", cnt);
if (cnt >= ScheduleCnt)
{
goto Schedule;
}
}
}
}
// enif_fprintf(stdout, "IMY************tryLocks333 %T \n", allList);
return atomTrue;
Schedule:
{
ERL_NIF_TERM ListLocked = enif_make_list(env, 0);
//for (auto it : locked)
// {
// enif_make_list_cell(env, enif_make_int(env, it), ListLocked);
//}
ERL_NIF_TERM newargv[2] = {allList, ListLocked};
//enif_fprintf(stdout, "IMY************tryLocks4444 %T \n", newargv);
return enif_schedule_nif(env, "tryLockSchedule", 0, &tryLockSchedule, 2, newargv);
}
Failed:
{
//uint64_t RExpected;
//for (auto it : locked)
// {
//RExpected = Val;
// LockSlot[it].compare_exchange_strong(RExpected, 0);
// }
return atomFalse;
}
}
else
{
int KeyIx;
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, argv[0], HashSalt) % LockSize;
ErlNifPid ThePid;
enif_self(env, &ThePid);
uint64_t Val = (uint64_t)(ThePid.pid);
if (lockOne(env, &ThePid, KeyIx, Val))
{
return atomTrue;
}
else
{
return atomFalse;
}
}
}
ERL_NIF_TERM releaseLockSchedule(ErlNifEnv *env, int, const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM allList = argv[0];
ERL_NIF_TERM head;
ErlNifPid ThePid;
enif_self(env, &ThePid);
uint64_t Expected = (uint64_t)(ThePid.pid);
uint64_t RExpected;
int KeyIx;
int isAllOk = 1;
enif_get_int(env, argv[1], &isAllOk);
while (enif_get_list_cell(env, allList, &head, &allList))
{
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize;
RExpected = Expected;
if (!LockSlot[KeyIx].compare_exchange_strong(RExpected, 0))
{
isAllOk = 0;
}
}
return isAllOk > 0 ? atomTrue : atomFalse;
}
ERL_NIF_TERM releaseLock(ErlNifEnv *env, int, const ERL_NIF_TERM argv[])
{
if (enif_is_list(env, argv[0]))
{
ERL_NIF_TERM allList = argv[0];
ERL_NIF_TERM head;
ErlNifPid ThePid;
enif_self(env, &ThePid);
uint64_t Expected = (uint64_t)(ThePid.pid);
uint64_t RExpected;
int KeyIx;
unsigned int cnt = 0;
int isAllOk = 1;
while (enif_get_list_cell(env, allList, &head, &allList))
{
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, head, HashSalt) % LockSize;
RExpected = Expected;
if (!LockSlot[KeyIx].compare_exchange_strong(RExpected, 0))
{
isAllOk = 0;
}
cnt++;
if (cnt >= ScheduleCnt)
{
goto Schedule;
}
}
return isAllOk > 0 ? atomTrue : atomFalse;
Schedule:
{
ERL_NIF_TERM ArgvAllOk = enif_make_int(env, isAllOk);
ERL_NIF_TERM newargv[2] = {allList, ArgvAllOk};
//enif_fprintf(stdout, "IMY************tryLocks4444 %T \n", newargv);
return enif_schedule_nif(env, "releaseLockSchedule", ERL_NIF_DIRTY_JOB_CPU_BOUND, &releaseLockSchedule, 2, newargv);
}
}
else
{
int KeyIx;
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, argv[0], HashSalt) % LockSize;
ErlNifPid ThePid;
enif_self(env, &ThePid);
uint64_t Expected = (uint64_t)(ThePid.pid);
if (LockSlot[KeyIx].compare_exchange_strong(Expected, 0))
{
return atomTrue;
}
else
{
return atomFalse;
}
}
}
ERL_NIF_TERM getLockPid(ErlNifEnv *env, int, const ERL_NIF_TERM argv[])
{
int KeyIx;
KeyIx = enif_hash(ERL_NIF_INTERNAL_HASH, argv[0], HashSalt) % LockSize;
ErlNifPid ThePid;
uint64_t Var = LockSlot[KeyIx].load();
if (Var > 0)
{
ThePid.pid = (ERL_NIF_TERM)Var;
return enif_make_pid(env, &ThePid);
}
else
{
return atomUndefined;
}
}
static ErlNifFunc nifFuncs[] = {
{"tryLock", 1, tryLock},
{"releaseLock", 1, releaseLock},
{"getLockPid", 1, getLockPid}};
ERL_NIF_INIT(eNifLock, nifFuncs, &nifLoad, NULL, NULL, NULL)

+ 1
- 1
c_src/eNifLock/rebar.config Datei anzeigen

@ -1,5 +1,5 @@
{port_specs, [
{"../../priv/eNifLock.so", ["*.c"]}
{"../../priv/eNifLock.so", ["*.cc"]}
]}.

+ 5
- 2
include/eGLock.hrl Datei anzeigen

@ -1,9 +1,12 @@
%% :Ms
-define(LockTimeOut, 5000).
%% :Ms
-define(ReTryTime, 10).
-define(ReTryTime, 3).
%%
-define(eGLockSize, 1048576).
-define(eGLockBatch, 15).
%%
-define(eGLockSize, 2097152).
%% atomics索引
-define(eGLockRef, eGLockRef).

+ 25
- 87
src/eCLock.erl Datei anzeigen

@ -7,6 +7,7 @@
tryLock/1
, tryLock/2
, releaseLock/1
, getLockPid/1
, lockApply/2
, lockApply/3
]).
@ -16,74 +17,45 @@ tryLock(KeyOrKeys) ->
tryLock(KeyOrKeys, ?LockTimeOut).
tryLock(KeyOrKeys, TimeOut) ->
case is_list(KeyOrKeys) of
true ->
KeyIxs = getKexIxs(KeyOrKeys, []),
doTryLocks(KeyIxs, TimeOut);
_ ->
doTryLock(erlang:phash2(KeyOrKeys, ?eGLockSize), TimeOut)
end.
doTryLock(KeyOrKeys, TimeOut).
doTryLock(KeyIx, TimeOut) ->
case eNifLock:tryLock(KeyIx) of
doTryLock(KeyOrKeys, TimeOut) ->
case eNifLock:tryLock(KeyOrKeys) of
true ->
true;
_ ->
loopLock(KeyIx, TimeOut)
loopLock(KeyOrKeys, TimeOut)
end.
loopLock(KeyIx, TimeOut) ->
loopLock(KeyOrKeys, TimeOut) ->
receive
after ?ReTryTime ->
LTimeOut = ?CASE(TimeOut == infinity, TimeOut, TimeOut - ?ReTryTime),
case LTimeOut >= 0 of
true ->
case eNifLock:tryLock(KeyIx) of
case eNifLock:tryLock(KeyOrKeys) of
true ->
true;
_ ->
loopLock(KeyIx, LTimeOut)
loopLock(KeyOrKeys, LTimeOut)
end;
_ ->
ltimeout
end
end.
doTryLocks(KeyIxs, TimeOut) ->
case tryLockAll(KeyIxs, []) of
true ->
true;
_ ->
loopLocks(KeyIxs, TimeOut)
end.
loopLocks(KeyIxs, TimeOut) ->
receive
after ?ReTryTime ->
LTimeOut = ?CASE(TimeOut == infinity, TimeOut, TimeOut - ?ReTryTime),
case LTimeOut >= 0 of
true ->
case tryLockAll(KeyIxs, []) of
true ->
true;
_ ->
loopLocks(KeyIxs, LTimeOut)
end;
_ ->
ltimeout
end
end.
-spec releaseLock(KeyOrKeys :: term() | [term()]) -> ok.
releaseLock(KeyOrKeys) ->
eNifLock:releaseLocks(KeyOrKeys).
-spec getLockPid(KeyOrKeys :: term() | [term()]) -> ok.
getLockPid(KeyOrKeys) ->
case is_list(KeyOrKeys) of
true ->
KeyIxs = getKexIxs(KeyOrKeys, []),
[eNifLock:releaseLock(OneKeyIx) || OneKeyIx <- KeyIxs],
ok;
[{OneKey, eNifLock:getLockPid(OneKey)} || OneKey <- KeyOrKeys];
_ ->
eNifLock:releaseLock(erlang:phash2(KeyOrKeys, ?eGLockSize)),
ok
{KeyOrKeys, eNifLock:getLockPid(KeyOrKeys)}
end.
-spec lockApply(KeyOrKeys :: term() | [term()], MFAOrFun :: {M :: atom(), F :: atom(), Args :: list()} | {Fun :: function(), Args :: list()}) -> term() | {error, ltimeout} | {error, {lock_apply_error, term()}}.
@ -92,55 +64,21 @@ lockApply(KeyOrKeys, MFAOrFun) ->
-spec lockApply(KeyOrKeys :: term() | [term()], MFAOrFun :: {M :: atom(), F :: atom(), Args :: list()} | {Fun :: function(), Args :: list()}, TimeOut :: integer() | infinity) -> term().
lockApply(KeyOrKeys, MFAOrFun, TimeOut) ->
case is_list(KeyOrKeys) of
case doTryLock(KeyOrKeys, TimeOut) of
true ->
KeyIxs = getKexIxs(KeyOrKeys, []),
case doTryLocks(KeyIxs, TimeOut) of
true ->
try doApply(MFAOrFun)
catch C:R:S ->
{error, {lock_apply_error, {C, R, S}}}
after
[eNifLock:releaseLock(OneKeyIx) || OneKeyIx <- KeyIxs],
ok
end;
ltimeout ->
{error, ltimeout}
try doApply(MFAOrFun)
catch C:R:S ->
{error, {lock_apply_error, {C, R, S}}}
after
eNifLock:releaseLock(KeyOrKeys),
ok
end;
_ ->
KeyIx = erlang:phash2(KeyOrKeys, ?eGLockSize),
case doTryLock(KeyIx, TimeOut) of
true ->
try doApply(MFAOrFun)
catch C:R:S ->
{error, {lock_apply_error, {C, R, S}}}
after
eNifLock:releaseLock(KeyIx),
ok
end;
ltimeout ->
{error, ltimeout}
end
end.
getKexIxs([], IxAcc) -> IxAcc;
getKexIxs([Key | Keys], IxAcc) ->
KeyIx = erlang:phash2(Key, ?eGLockSize),
getKexIxs(Keys, ?CASE(lists:member(KeyIx, IxAcc), IxAcc, [KeyIx | IxAcc])).
tryLockAll([], _LockAcc) ->
true;
tryLockAll([KeyIx | KeyIxs], LockAcc) ->
case eNifLock:tryLock(KeyIx) of
true ->
tryLockAll(KeyIxs, [KeyIx | LockAcc]);
_ ->
[eNifLock:releaseLock(OneLock) || OneLock <- LockAcc],
false
ltimeout ->
{error, ltimeout}
end.
doApply({M, F, A}) ->
apply(M, F, A);
doApply({M, F, Args}) ->
apply(M, F, Args);
doApply({Fun, Args}) ->
apply(Fun, Args).

+ 2
- 2
src/eGLock.erl Datei anzeigen

@ -214,8 +214,8 @@ tryLockAll([KeyIx | KeyIxs], ALockRef, PidInt, LockAcc) ->
false
end.
doApply({M, F, A}) ->
apply(M, F, A);
doApply({M, F, Args}) ->
apply(M, F, Args);
doApply({Fun, Args}) ->
apply(Fun, Args).

+ 12
- 8
src/eNifLock.erl Datei anzeigen

@ -1,6 +1,10 @@
-module(eNifLock).
-export([tryLock/1, releaseLock/1, getLockPid/1]).
-export([
tryLock/1
, releaseLock/1
, getLockPid/1
]).
-on_load(init/0).
@ -19,14 +23,14 @@ init() ->
end,
erlang:load_nif(SoName, 0).
-spec tryLock(KeyIx :: non_neg_integer()) -> true | false.
tryLock(_KeyIx) ->
-spec tryLock(_KeyOrKeys :: term() | [term()]) -> true | false.
tryLock(_KeyOrKeys) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, ?LINE}]}).
-spec releaseLock(KeyIx :: non_neg_integer()) -> true | false.
releaseLock(_KeyIx) ->
-spec releaseLock(_KeyOrKeys :: term() | [term()]) -> true | false.
releaseLock(_KeyOrKeys) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, ?LINE}]}).
-spec getLockPid(KeyIx :: non_neg_integer()) -> pid() | undefined.
getLockPid(_KeyIx) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, ?LINE}]}).
-spec getLockPid(OneKey :: term()) -> pid() | undefined.
getLockPid(_OneKey) ->
erlang:nif_error({not_loaded, [{module, ?MODULE}, {line1, ?LINE}]}).

Laden…
Abbrechen
Speichern