erlang各种有用的函数包括一些有用nif封装,还有一些性能测试case。
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

905 строки
27 KiB

5 лет назад
  1. #include "NeuralTable.h"
  2. /* !!!! A NOTE ON KEYS !!!!
  3. * Keys should be integer values passed from the erlang emulator,
  4. * and should be generated by a hashing function. There is no easy
  5. * way to hash an erlang term from a NIF, but ERTS is more than
  6. * capable of doing so.
  7. *
  8. * Additionally, this workaround means that traditional collision
  9. * handling mechanisms for hash tables will not work without
  10. * special consideration. For instance, to compare keys as you
  11. * would by storing linked lists, you must retrieve the stored
  12. * tuple and call enif_compare or enif_is_identical on the key
  13. * elements of each tuple.
  14. */
  15. table_set NeuralTable::tables;
  16. atomic<bool> NeuralTable::running(true);
  17. ErlNifMutex *NeuralTable::table_mutex;
  18. NeuralTable::NeuralTable(unsigned int kp) {
  19. for (int i = 0; i < BUCKET_COUNT; ++i) {
  20. ErlNifEnv *env = enif_alloc_env();
  21. env_buckets[i] = env;
  22. locks[i] = enif_rwlock_create("neural_table");
  23. garbage_cans[i] = 0;
  24. reclaimable[i] = enif_make_list(env, 0);
  25. }
  26. start_gc();
  27. start_batch();
  28. key_pos = kp;
  29. }
  30. NeuralTable::~NeuralTable() {
  31. stop_batch();
  32. stop_gc();
  33. for (int i = 0; i < BUCKET_COUNT; ++i) {
  34. enif_rwlock_destroy(locks[i]);
  35. enif_free_env(env_buckets[i]);
  36. }
  37. }
  38. /* ================================================================
  39. * MakeTable
  40. * Allocates a new table, assuming a unique atom identifier. This
  41. * table is stored in a static container. All interactions with
  42. * the table must be performed through the static class API.
  43. */
  44. ERL_NIF_TERM NeuralTable::MakeTable(ErlNifEnv *env, ERL_NIF_TERM name, ERL_NIF_TERM key_pos) {
  45. char *atom;
  46. string key;
  47. unsigned int len = 0,
  48. pos = 0;
  49. ERL_NIF_TERM ret;
  50. // Allocate space for the name of the table
  51. enif_get_atom_length(env, name, &len, ERL_NIF_LATIN1);
  52. atom = (char*)enif_alloc(len + 1);
  53. // Fetch the value of the atom and store it in a string (because I can, that's why)
  54. enif_get_atom(env, name, atom, len + 1, ERL_NIF_LATIN1);
  55. key = atom;
  56. // Deallocate that space
  57. enif_free(atom);
  58. // Get the key position value
  59. enif_get_uint(env, key_pos, &pos);
  60. enif_mutex_lock(table_mutex);
  61. if (NeuralTable::tables.find(key) != NeuralTable::tables.end()) {
  62. // Table already exists? Bad monkey!
  63. ret = enif_make_badarg(env);
  64. } else {
  65. // All good. Make the table
  66. NeuralTable::tables[key] = new NeuralTable(pos);
  67. ret = enif_make_atom(env, "ok");
  68. }
  69. enif_mutex_unlock(table_mutex);
  70. return ret;
  71. }
  72. /* ================================================================
  73. * GetTable
  74. * Retrieves a handle to the table referenced by name, assuming
  75. * such a table exists. If not, throw badarg.
  76. */
  77. NeuralTable* NeuralTable::GetTable(ErlNifEnv *env, ERL_NIF_TERM name) {
  78. char *atom = NULL;
  79. string key;
  80. unsigned len = 0;
  81. NeuralTable *ret = NULL;
  82. table_set::const_iterator it;
  83. // Allocate space for the table name
  84. enif_get_atom_length(env, name, &len, ERL_NIF_LATIN1);
  85. atom = (char*)enif_alloc(len + 1);
  86. // Copy the table name into a string
  87. enif_get_atom(env, name, atom, len + 1, ERL_NIF_LATIN1);
  88. key = atom;
  89. // Deallocate that space
  90. enif_free(atom);
  91. // Look for the table and return its pointer if found
  92. it = NeuralTable::tables.find(key);
  93. if (it != NeuralTable::tables.end()) {
  94. ret = it->second;
  95. }
  96. return ret;
  97. }
  98. /* ================================================================
  99. * Insert
  100. * Inserts a tuple into the table with key.
  101. */
  102. ERL_NIF_TERM NeuralTable::Insert(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key, ERL_NIF_TERM object) {
  103. NeuralTable *tb;
  104. ERL_NIF_TERM ret, old;
  105. unsigned long int entry_key = 0;
  106. // Grab table or bail.
  107. tb = GetTable(env, table);
  108. if (tb == NULL) {
  109. return enif_make_badarg(env);
  110. }
  111. // Get key value.
  112. enif_get_ulong(env, key, &entry_key);
  113. // Lock the key.
  114. tb->rwlock(entry_key);
  115. // Attempt to lookup the value. If nonempty, increment
  116. // discarded term counter and return a copy of the
  117. // old value
  118. if (tb->find(entry_key, old)) {
  119. tb->reclaim(entry_key, old);
  120. ret = enif_make_tuple2(env, enif_make_atom(env, "ok"), enif_make_copy(env, old));
  121. } else {
  122. ret = enif_make_atom(env, "ok");
  123. }
  124. // Write that shit out
  125. tb->put(entry_key, object);
  126. // Oh, and unlock the key if you would.
  127. tb->rwunlock(entry_key);
  128. return ret;
  129. }
  130. /* ================================================================
  131. * InsertNew
  132. * Inserts a tuple into the table with key, assuming there is not
  133. * a value with key already. Returns true if there was no value
  134. * for key, or false if there was.
  135. */
  136. ERL_NIF_TERM NeuralTable::InsertNew(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key, ERL_NIF_TERM object) {
  137. NeuralTable *tb;
  138. ERL_NIF_TERM ret, old;
  139. unsigned long int entry_key = 0;
  140. // Get the table or bail
  141. tb = GetTable(env, table);
  142. if (tb == NULL) {
  143. return enif_make_badarg(env);
  144. }
  145. // Get the key value
  146. enif_get_ulong(env, key, &entry_key);
  147. // Get write lock for the key
  148. tb->rwlock(entry_key);
  149. if (tb->find(entry_key, old)) {
  150. // Key was found. Return false and do not insert
  151. ret = enif_make_atom(env, "false");
  152. } else {
  153. // Key was not found. Return true and insert
  154. tb->put(entry_key, object);
  155. ret = enif_make_atom(env, "true");
  156. }
  157. // Release write lock for the key
  158. tb->rwunlock(entry_key);
  159. return ret;
  160. }
  161. /* ================================================================
  162. * Increment
  163. * Processes a list of update operations. Each operation specifies
  164. * a position in the stored tuple to update and an integer to add
  165. * to it.
  166. */
  167. ERL_NIF_TERM NeuralTable::Increment(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key, ERL_NIF_TERM ops) {
  168. NeuralTable *tb;
  169. ERL_NIF_TERM ret, old;
  170. ERL_NIF_TERM it;
  171. unsigned long int entry_key = 0;
  172. // Get table handle or bail
  173. tb = GetTable(env, table);
  174. if (tb == NULL) {
  175. return enif_make_badarg(env);
  176. }
  177. // Get key value
  178. enif_get_ulong(env, key, &entry_key);
  179. // Acquire read/write lock for key
  180. tb->rwlock(entry_key);
  181. // Try to read the value as it is
  182. if (tb->find(entry_key, old)) {
  183. // Value exists
  184. ERL_NIF_TERM op_cell;
  185. const ERL_NIF_TERM *tb_tpl;
  186. const ERL_NIF_TERM *op_tpl;
  187. ERL_NIF_TERM *new_tpl;
  188. ErlNifEnv *bucket_env = tb->get_env(entry_key);
  189. unsigned long int pos = 0;
  190. long int incr = 0;
  191. unsigned int ops_length = 0;
  192. int op_arity = 0,
  193. tb_arity = 0;
  194. // Expand tuple to work on elements
  195. enif_get_tuple(bucket_env, old, &tb_arity, &tb_tpl);
  196. // Allocate space for a copy the contents of the table
  197. // tuple and copy it in. All changes are to be made to
  198. // the copy of the tuple.
  199. new_tpl = (ERL_NIF_TERM*)enif_alloc(sizeof(ERL_NIF_TERM) * tb_arity);
  200. memcpy(new_tpl, tb_tpl, sizeof(ERL_NIF_TERM) * tb_arity);
  201. // Create empty list cell for return value.
  202. ret = enif_make_list(env, 0);
  203. // Set iterator to first cell of ops
  204. it = ops;
  205. while(!enif_is_empty_list(env, it)) {
  206. long int value = 0;
  207. enif_get_list_cell(env, it, &op_cell, &it); // op_cell = hd(it), it = tl(it)
  208. enif_get_tuple(env, op_cell, &op_arity, &op_tpl); // op_arity = tuple_size(op_cell), op_tpl = [TplPos1, TplPos2]
  209. enif_get_ulong(env, op_tpl[0], &pos); // pos = (uint64)op_tpl[0]
  210. enif_get_long(env, op_tpl[1], &incr); // incr = (uint64)op_tpl[1]
  211. // Is the operation trying to modify a nonexistant
  212. // position?
  213. if (pos <= 0 || pos > tb_arity) {
  214. ret = enif_make_badarg(env);
  215. goto bailout;
  216. }
  217. // Is the operation trying to add to a value that's
  218. // not a number?
  219. if (!enif_is_number(bucket_env, new_tpl[pos - 1])) {
  220. ret = enif_make_badarg(env);
  221. goto bailout;
  222. }
  223. // Update the value stored in the tuple.
  224. enif_get_long(env, new_tpl[pos - 1], &value);
  225. tb->reclaim(entry_key, new_tpl[pos - 1]);
  226. new_tpl[pos - 1] = enif_make_long(bucket_env, value + incr);
  227. // Copy the new value to the head of the return list
  228. ret = enif_make_list_cell(env, enif_make_copy(env, new_tpl[pos - 1]), ret);
  229. }
  230. tb->put(entry_key, enif_make_tuple_from_array(bucket_env, new_tpl, tb_arity));
  231. // Bailout allows cancelling the update opertion
  232. // in case something goes wrong. It must always
  233. // come after tb->put and before enif_free and
  234. // rwunlock
  235. bailout:
  236. enif_free(new_tpl);
  237. } else {
  238. ret = enif_make_badarg(env);
  239. }
  240. // Release the rwlock for entry_key
  241. tb->rwunlock(entry_key);
  242. return ret;
  243. }
  244. /* ================================================================
  245. * Unshift
  246. * Processes a list of update operations. Each update operation is
  247. * a tuple specifying the position of a list in the stored value to
  248. * update and a list of values to append. Elements are shifted from
  249. * the input list to the stored list, so:
  250. *
  251. * unshift([a,b,c,d]) results in [d,c,b,a]
  252. */
  253. ERL_NIF_TERM NeuralTable::Unshift(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key, ERL_NIF_TERM ops) {
  254. NeuralTable *tb;
  255. ERL_NIF_TERM ret, old, it;
  256. unsigned long int entry_key;
  257. ErlNifEnv *bucket_env;
  258. tb = GetTable(env, table);
  259. if (tb == NULL) {
  260. return enif_make_badarg(env);
  261. }
  262. enif_get_ulong(env, key, &entry_key);
  263. tb->rwlock(entry_key);
  264. bucket_env = tb->get_env(entry_key);
  265. if (tb->find(entry_key, old)) {
  266. const ERL_NIF_TERM *old_tpl,
  267. *op_tpl;
  268. ERL_NIF_TERM *new_tpl;
  269. int tb_arity = 0,
  270. op_arity = 0;
  271. unsigned long pos = 0;
  272. unsigned int new_length = 0;
  273. ERL_NIF_TERM op,
  274. unshift,
  275. copy_it,
  276. copy_val;
  277. enif_get_tuple(bucket_env, old, &tb_arity, &old_tpl);
  278. new_tpl = (ERL_NIF_TERM*)enif_alloc(sizeof(ERL_NIF_TERM) * tb_arity);
  279. memcpy(new_tpl, old_tpl, sizeof(ERL_NIF_TERM) * tb_arity);
  280. it = ops;
  281. ret = enif_make_list(env, 0);
  282. while (!enif_is_empty_list(env, it)) {
  283. // Examine the operation.
  284. enif_get_list_cell(env, it, &op, &it); // op = hd(it), it = tl(it)
  285. enif_get_tuple(env, op, &op_arity, &op_tpl); // op_arity = tuple_size(op), op_tpl = [TplPos1, TplPos2]
  286. enif_get_ulong(env, op_tpl[0], &pos); // Tuple position to modify
  287. unshift = op_tpl[1]; // Values to unshfit
  288. // Argument 1 of the operation tuple is position;
  289. // make sure it's within the bounds of the tuple
  290. // in the table.
  291. if (pos <= 0 || pos > tb_arity) {
  292. ret = enif_make_badarg(env);
  293. goto bailout;
  294. }
  295. // Make sure we were passed a list of things to push
  296. // onto the posth element of the entry
  297. if (!enif_is_list(env, unshift)) {
  298. ret = enif_make_badarg(env);
  299. }
  300. // Now iterate over unshift, moving its values to
  301. // the head of new_tpl[pos - 1] one by one
  302. copy_it = unshift;
  303. while (!enif_is_empty_list(env, copy_it)) {
  304. enif_get_list_cell(env, copy_it, &copy_val, &copy_it);
  305. new_tpl[pos - 1] = enif_make_list_cell(bucket_env, enif_make_copy(bucket_env, copy_val), new_tpl[pos - 1]);
  306. }
  307. enif_get_list_length(bucket_env, new_tpl[pos - 1], &new_length);
  308. ret = enif_make_list_cell(env, enif_make_uint(env, new_length), ret);
  309. }
  310. tb->put(entry_key, enif_make_tuple_from_array(bucket_env, new_tpl, tb_arity));
  311. bailout:
  312. enif_free(new_tpl);
  313. } else {
  314. ret = enif_make_badarg(env);
  315. }
  316. tb->rwunlock(entry_key);
  317. return ret;
  318. }
  319. ERL_NIF_TERM NeuralTable::Shift(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key, ERL_NIF_TERM ops) {
  320. NeuralTable *tb;
  321. ERL_NIF_TERM ret, old, it;
  322. unsigned long int entry_key;
  323. ErlNifEnv *bucket_env;
  324. tb = GetTable(env, table);
  325. if (tb == NULL) {
  326. return enif_make_badarg(env);
  327. }
  328. enif_get_ulong(env, key, &entry_key);
  329. tb->rwlock(entry_key);
  330. bucket_env = tb->get_env(entry_key);
  331. if (tb->find(entry_key, old)) {
  332. const ERL_NIF_TERM *old_tpl;
  333. const ERL_NIF_TERM *op_tpl;
  334. ERL_NIF_TERM *new_tpl;
  335. int tb_arity = 0,
  336. op_arity = 0;
  337. unsigned long pos = 0,
  338. count = 0;
  339. ERL_NIF_TERM op, list, shifted, reclaim;
  340. enif_get_tuple(bucket_env, old, &tb_arity, &old_tpl);
  341. new_tpl = (ERL_NIF_TERM*)enif_alloc(tb_arity * sizeof(ERL_NIF_TERM));
  342. memcpy(new_tpl, old_tpl, sizeof(ERL_NIF_TERM) * tb_arity);
  343. it = ops;
  344. ret = enif_make_list(env, 0);
  345. reclaim = enif_make_list(bucket_env, 0);
  346. while(!enif_is_empty_list(env, it)) {
  347. enif_get_list_cell(env, it, &op, &it);
  348. enif_get_tuple(env, op, &op_arity, &op_tpl);
  349. enif_get_ulong(env, op_tpl[0], &pos);
  350. enif_get_ulong(env, op_tpl[1], &count);
  351. if (pos <= 0 || pos > tb_arity) {
  352. ret = enif_make_badarg(env);
  353. goto bailout;
  354. }
  355. if (!enif_is_list(env, new_tpl[pos -1])) {
  356. ret = enif_make_badarg(env);
  357. goto bailout;
  358. }
  359. shifted = enif_make_list(env, 0);
  360. if (count > 0) {
  361. ERL_NIF_TERM copy_it = new_tpl[pos - 1],
  362. val;
  363. int i = 0;
  364. while (i < count && !enif_is_empty_list(bucket_env, copy_it)) {
  365. enif_get_list_cell(bucket_env, copy_it, &val, &copy_it);
  366. ++i;
  367. shifted = enif_make_list_cell(env, enif_make_copy(env, val), shifted);
  368. reclaim = enif_make_list_cell(env, val, reclaim);
  369. }
  370. new_tpl[pos - 1] = copy_it;
  371. } else if (count < 0) {
  372. ERL_NIF_TERM copy_it = new_tpl[pos - 1],
  373. val;
  374. while (!enif_is_empty_list(bucket_env, copy_it)) {
  375. enif_get_list_cell(bucket_env, copy_it, &val, &copy_it);
  376. shifted = enif_make_list_cell(env, enif_make_copy(env, val), shifted);
  377. reclaim = enif_make_list_cell(env, val, reclaim);
  378. }
  379. new_tpl[pos - 1] = copy_it;
  380. }
  381. ret = enif_make_list_cell(env, shifted, ret);
  382. }
  383. tb->put(entry_key, enif_make_tuple_from_array(bucket_env, new_tpl, tb_arity));
  384. tb->reclaim(entry_key, reclaim);
  385. bailout:
  386. enif_free(new_tpl);
  387. } else {
  388. ret = enif_make_badarg(env);
  389. }
  390. tb->rwunlock(entry_key);
  391. return ret;
  392. }
  393. ERL_NIF_TERM NeuralTable::Swap(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key, ERL_NIF_TERM ops) {
  394. NeuralTable *tb;
  395. ERL_NIF_TERM ret, old, it;
  396. unsigned long int entry_key;
  397. ErlNifEnv *bucket_env;
  398. tb = GetTable(env, table);
  399. if (tb == NULL) {
  400. return enif_make_badarg(env);
  401. }
  402. enif_get_ulong(env, key, &entry_key);
  403. tb->rwlock(entry_key);
  404. bucket_env = tb->get_env(entry_key);
  405. if (tb->find(entry_key, old)) {
  406. const ERL_NIF_TERM *old_tpl;
  407. const ERL_NIF_TERM *op_tpl;
  408. ERL_NIF_TERM *new_tpl;
  409. int tb_arity = 0,
  410. op_arity = 0;
  411. unsigned long pos = 0;
  412. ERL_NIF_TERM op, list, shifted, reclaim;
  413. enif_get_tuple(bucket_env, old, &tb_arity, &old_tpl);
  414. new_tpl = (ERL_NIF_TERM*)enif_alloc(tb_arity * sizeof(ERL_NIF_TERM));
  415. memcpy(new_tpl, old_tpl, sizeof(ERL_NIF_TERM) * tb_arity);
  416. it = ops;
  417. ret = enif_make_list(env, 0);
  418. reclaim = enif_make_list(bucket_env, 0);
  419. while (!enif_is_empty_list(env, it)) {
  420. enif_get_list_cell(env, it, &op, &it);
  421. enif_get_tuple(env, op, &op_arity, &op_tpl);
  422. enif_get_ulong(env, op_tpl[0], &pos);
  423. if (pos <= 0 || pos > tb_arity) {
  424. ret = enif_make_badarg(env);
  425. goto bailout;
  426. }
  427. reclaim = enif_make_list_cell(bucket_env, new_tpl[pos - 1], reclaim);
  428. ret = enif_make_list_cell(env, enif_make_copy(env, new_tpl[pos -1]), ret);
  429. new_tpl[pos - 1] = enif_make_copy(bucket_env, op_tpl[1]);
  430. }
  431. tb->put(entry_key, enif_make_tuple_from_array(bucket_env, new_tpl, tb_arity));
  432. tb->reclaim(entry_key, reclaim);
  433. bailout:
  434. enif_free(new_tpl);
  435. } else {
  436. ret = enif_make_badarg(env);
  437. }
  438. tb->rwunlock(entry_key);
  439. return ret;
  440. }
  441. ERL_NIF_TERM NeuralTable::Delete(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key) {
  442. NeuralTable *tb;
  443. ERL_NIF_TERM val, ret;
  444. unsigned long int entry_key;
  445. tb = GetTable(env, table);
  446. if (tb == NULL) { return enif_make_badarg(env); }
  447. enif_get_ulong(env, key, &entry_key);
  448. tb->rwlock(entry_key);
  449. if (tb->erase(entry_key, val)) {
  450. tb->reclaim(entry_key, val);
  451. ret = enif_make_copy(env, val);
  452. } else {
  453. ret = enif_make_atom(env, "undefined");
  454. }
  455. tb->rwunlock(entry_key);
  456. return ret;
  457. }
  458. ERL_NIF_TERM NeuralTable::Empty(ErlNifEnv *env, ERL_NIF_TERM table) {
  459. NeuralTable *tb;
  460. int n = 0;
  461. tb = GetTable(env, table);
  462. if (tb == NULL) { return enif_make_badarg(env); }
  463. // First, lock EVERY bucket. We want this to be an isolated operation.
  464. for (n = 0; n < BUCKET_COUNT; ++n) {
  465. enif_rwlock_rwlock(tb->locks[n]);
  466. }
  467. // Now clear the table
  468. for (n = 0; n < BUCKET_COUNT; ++n) {
  469. tb->hash_buckets[n].clear();
  470. enif_clear_env(tb->env_buckets[n]);
  471. tb->garbage_cans[n] = 0;
  472. tb->reclaimable[n] = enif_make_list(tb->env_buckets[n], 0);
  473. }
  474. // Now unlock every bucket.
  475. for (n = 0; n < BUCKET_COUNT; ++n) {
  476. enif_rwlock_rwunlock(tb->locks[n]);
  477. }
  478. return enif_make_atom(env, "ok");
  479. }
  480. ERL_NIF_TERM NeuralTable::Get(ErlNifEnv *env, ERL_NIF_TERM table, ERL_NIF_TERM key) {
  481. NeuralTable *tb;
  482. ERL_NIF_TERM ret, val;
  483. unsigned long int entry_key;
  484. // Acquire table handle, or quit if the table doesn't exist.
  485. tb = GetTable(env, table);
  486. if (tb == NULL) { return enif_make_badarg(env); }
  487. // Get key value
  488. enif_get_ulong(env, key, &entry_key);
  489. // Lock the key
  490. tb->rlock(entry_key);
  491. // Read current value
  492. if (!tb->find(entry_key, val)) {
  493. ret = enif_make_atom(env, "undefined");
  494. } else {
  495. ret = enif_make_copy(env, val);
  496. }
  497. tb->runlock(entry_key);
  498. return ret;
  499. }
  500. ERL_NIF_TERM NeuralTable::Dump(ErlNifEnv *env, ERL_NIF_TERM table) {
  501. NeuralTable *tb = GetTable(env, table);
  502. ErlNifPid self;
  503. ERL_NIF_TERM ret;
  504. if (tb == NULL) { return enif_make_badarg(env); }
  505. enif_self(env, &self);
  506. tb->add_batch_job(self, &NeuralTable::batch_dump);
  507. return enif_make_atom(env, "$neural_batch_wait");
  508. }
  509. ERL_NIF_TERM NeuralTable::Drain(ErlNifEnv *env, ERL_NIF_TERM table) {
  510. NeuralTable *tb = GetTable(env, table);
  511. ErlNifPid self;
  512. int ret;
  513. if (tb == NULL) { return enif_make_badarg(env); }
  514. enif_self(env, &self);
  515. tb->add_batch_job(self, &NeuralTable::batch_drain);
  516. return enif_make_atom(env, "$neural_batch_wait");
  517. }
  518. ERL_NIF_TERM NeuralTable::GetKeyPosition(ErlNifEnv *env, ERL_NIF_TERM table) {
  519. NeuralTable *tb = GetTable(env, table);
  520. if (tb == NULL) { return enif_make_badarg(env); }
  521. return enif_make_uint(env, tb->key_pos);
  522. }
  523. ERL_NIF_TERM NeuralTable::GarbageCollect(ErlNifEnv *env, ERL_NIF_TERM table) {
  524. NeuralTable *tb = GetTable(env, table);
  525. if (tb == NULL) { return enif_make_badarg(env); }
  526. enif_cond_signal(tb->gc_cond);
  527. return enif_make_atom(env, "ok");
  528. }
  529. ERL_NIF_TERM NeuralTable::GarbageSize(ErlNifEnv *env, ERL_NIF_TERM table) {
  530. NeuralTable *tb = GetTable(env, table);
  531. unsigned long int size = 0;
  532. if (tb == NULL) { return enif_make_badarg(env); }
  533. size = tb->garbage_size();
  534. return enif_make_ulong(env, size);
  535. }
  536. void* NeuralTable::DoGarbageCollection(void *table) {
  537. NeuralTable *tb = (NeuralTable*)table;
  538. enif_mutex_lock(tb->gc_mutex);
  539. while (running.load(memory_order_acquire)) {
  540. while (running.load(memory_order_acquire) && tb->garbage_size() < RECLAIM_THRESHOLD) {
  541. enif_cond_wait(tb->gc_cond, tb->gc_mutex);
  542. }
  543. tb->gc();
  544. }
  545. enif_mutex_unlock(tb->gc_mutex);
  546. return NULL;
  547. }
  548. void* NeuralTable::DoReclamation(void *table) {
  549. const int max_eat = 5;
  550. NeuralTable *tb = (NeuralTable*)table;
  551. int i = 0, c = 0, t = 0;;
  552. ERL_NIF_TERM tl, hd;
  553. ErlNifEnv *env;
  554. while (running.load(memory_order_acquire)) {
  555. for (i = 0; i < BUCKET_COUNT; ++i) {
  556. c = 0;
  557. t = 0;
  558. tb->rwlock(i);
  559. env = tb->get_env(i);
  560. tl = tb->reclaimable[i];
  561. while (c++ < max_eat && !enif_is_empty_list(env, tl)) {
  562. enif_get_list_cell(env, tl, &hd, &tl);
  563. tb->garbage_cans[i] += estimate_size(env, hd);
  564. t += tb->garbage_cans[i];
  565. }
  566. tb->rwunlock(i);
  567. if (t >= RECLAIM_THRESHOLD) {
  568. enif_cond_signal(tb->gc_cond);
  569. }
  570. }
  571. #ifdef _WIN32
  572. Sleep(50);
  573. #else
  574. usleep(50000);
  575. #endif
  576. }
  577. return NULL;
  578. }
  579. void* NeuralTable::DoBatchOperations(void *table) {
  580. NeuralTable *tb = (NeuralTable*)table;
  581. enif_mutex_lock(tb->batch_mutex);
  582. while (running.load(memory_order_acquire)) {
  583. while (running.load(memory_order_acquire) && tb->batch_jobs.empty()) {
  584. enif_cond_wait(tb->batch_cond, tb->batch_mutex);
  585. }
  586. BatchJob job = tb->batch_jobs.front();
  587. (tb->*job.fun)(job.pid);
  588. tb->batch_jobs.pop();
  589. }
  590. enif_mutex_unlock(tb->batch_mutex);
  591. return NULL;
  592. }
  593. void NeuralTable::start_gc() {
  594. int ret;
  595. gc_mutex = enif_mutex_create("neural_table_gc");
  596. gc_cond = enif_cond_create("neural_table_gc");
  597. ret = enif_thread_create("neural_garbage_collector", &gc_tid, NeuralTable::DoGarbageCollection, (void*)this, NULL);
  598. if (ret != 0) {
  599. printf("[neural_gc] Can't create GC thread. Error Code: %d\r\n", ret);
  600. }
  601. // Start the reclaimer after the garbage collector.
  602. ret = enif_thread_create("neural_reclaimer", &rc_tid, NeuralTable::DoReclamation, (void*)this, NULL);
  603. if (ret != 0) {
  604. printf("[neural_gc] Can't create reclamation thread. Error Code: %d\r\n", ret);
  605. }
  606. }
  607. void NeuralTable::stop_gc() {
  608. enif_cond_signal(gc_cond);
  609. // Join the reclaimer before the garbage collector.
  610. enif_thread_join(rc_tid, NULL);
  611. enif_thread_join(gc_tid, NULL);
  612. }
  613. void NeuralTable::start_batch() {
  614. int ret;
  615. batch_mutex = enif_mutex_create("neural_table_batch");
  616. batch_cond = enif_cond_create("neural_table_batch");
  617. ret = enif_thread_create("neural_batcher", &batch_tid, NeuralTable::DoBatchOperations, (void*)this, NULL);
  618. if (ret != 0) {
  619. printf("[neural_batch] Can't create batch thread. Error Code: %d\r\n", ret);
  620. }
  621. }
  622. void NeuralTable::stop_batch() {
  623. enif_cond_signal(batch_cond);
  624. enif_thread_join(batch_tid, NULL);
  625. }
  626. void NeuralTable::put(unsigned long int key, ERL_NIF_TERM tuple) {
  627. ErlNifEnv *env = get_env(key);
  628. hash_buckets[GET_BUCKET(key)][key] = enif_make_copy(env, tuple);
  629. }
  630. ErlNifEnv* NeuralTable::get_env(unsigned long int key) {
  631. return env_buckets[GET_BUCKET(key)];
  632. }
  633. bool NeuralTable::find(unsigned long int key, ERL_NIF_TERM &ret) {
  634. hash_table *bucket = &hash_buckets[GET_BUCKET(key)];
  635. hash_table::iterator it = bucket->find(key);
  636. if (bucket->end() == it) {
  637. return false;
  638. } else {
  639. ret = it->second;
  640. return true;
  641. }
  642. }
  643. bool NeuralTable::erase(unsigned long int key, ERL_NIF_TERM &val) {
  644. hash_table *bucket = &hash_buckets[GET_BUCKET(key)];
  645. hash_table::iterator it = bucket->find(key);
  646. bool ret = false;
  647. if (it != bucket->end()) {
  648. ret = true;
  649. val = it->second;
  650. bucket->erase(it);
  651. }
  652. return ret;
  653. }
  654. void NeuralTable::add_batch_job(ErlNifPid pid, BatchFunction fun) {
  655. BatchJob job;
  656. job.pid = pid;
  657. job.fun = fun;
  658. enif_mutex_lock(batch_mutex);
  659. batch_jobs.push(job);
  660. enif_mutex_unlock(batch_mutex);
  661. enif_cond_signal(batch_cond);
  662. }
  663. void NeuralTable::batch_drain(ErlNifPid pid) {
  664. ErlNifEnv *env = enif_alloc_env();
  665. ERL_NIF_TERM msg, value;
  666. value = enif_make_list(env, 0);
  667. for (int i = 0; i < BUCKET_COUNT; ++i) {
  668. enif_rwlock_rwlock(locks[i]);
  669. for (hash_table::iterator it = hash_buckets[i].begin(); it != hash_buckets[i].end(); ++it) {
  670. value = enif_make_list_cell(env, enif_make_copy(env, it->second), value);
  671. }
  672. enif_clear_env(env_buckets[i]);
  673. hash_buckets[i].clear();
  674. garbage_cans[i] = 0;
  675. reclaimable[i] = enif_make_list(env_buckets[i], 0);
  676. enif_rwlock_rwunlock(locks[i]);
  677. }
  678. msg = enif_make_tuple2(env, enif_make_atom(env, "$neural_batch_response"), value);
  679. enif_send(NULL, &pid, env, msg);
  680. enif_free_env(env);
  681. }
  682. void NeuralTable::batch_dump(ErlNifPid pid) {
  683. ErlNifEnv *env = enif_alloc_env();
  684. ERL_NIF_TERM msg, value;
  685. value = enif_make_list(env, 0);
  686. for (int i = 0; i < BUCKET_COUNT; ++i) {
  687. enif_rwlock_rlock(locks[i]);
  688. for (hash_table::iterator it = hash_buckets[i].begin(); it != hash_buckets[i].end(); ++it) {
  689. value = enif_make_list_cell(env, enif_make_copy(env, it->second), value);
  690. }
  691. enif_rwlock_runlock(locks[i]);
  692. }
  693. msg = enif_make_tuple2(env, enif_make_atom(env, "$neural_batch_response"), value);
  694. enif_send(NULL, &pid, env, msg);
  695. enif_free_env(env);
  696. }
  697. void NeuralTable::reclaim(unsigned long int key, ERL_NIF_TERM term) {
  698. int bucket = GET_BUCKET(key);
  699. ErlNifEnv *env = get_env(key);
  700. reclaimable[bucket] = enif_make_list_cell(env, term, reclaimable[bucket]);
  701. }
  702. void NeuralTable::gc() {
  703. ErlNifEnv *fresh = NULL,
  704. *old = NULL;
  705. hash_table *bucket = NULL;
  706. hash_table::iterator it;
  707. unsigned int gc_curr = 0;
  708. for (; gc_curr < BUCKET_COUNT; ++gc_curr) {
  709. bucket = &hash_buckets[gc_curr];
  710. old = env_buckets[gc_curr];
  711. fresh = enif_alloc_env();
  712. enif_rwlock_rwlock(locks[gc_curr]);
  713. for (it = bucket->begin(); it != bucket->end(); ++it) {
  714. it->second = enif_make_copy(fresh, it->second);
  715. }
  716. garbage_cans[gc_curr] = 0;
  717. env_buckets[gc_curr] = fresh;
  718. reclaimable[gc_curr] = enif_make_list(fresh, 0);
  719. enif_free_env(old);
  720. enif_rwlock_rwunlock(locks[gc_curr]);
  721. }
  722. }
  723. unsigned long int NeuralTable::garbage_size() {
  724. unsigned long int size = 0;
  725. for (int i = 0; i < BUCKET_COUNT; ++i) {
  726. enif_rwlock_rlock(locks[i]);
  727. size += garbage_cans[i];
  728. enif_rwlock_runlock(locks[i]);
  729. }
  730. return size;
  731. }