You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

425 line
12 KiB

  1. // Provides an efficient implementation of a semaphore (LightweightSemaphore).
  2. // This is an extension of Jeff Preshing's sempahore implementation (licensed
  3. // under the terms of its separate zlib license) that has been adapted and
  4. // extended by Cameron Desrochers.
  5. #pragma once
  6. #include <cstddef> // For std::size_t
  7. #include <atomic>
  8. #include <type_traits> // For std::make_signed<T>
  9. #if defined(_WIN32)
  10. // Avoid including windows.h in a header; we only need a handful of
  11. // items, so we'll redeclare them here (this is relatively safe since
  12. // the API generally has to remain stable between Windows versions).
  13. // I know this is an ugly hack but it still beats polluting the global
  14. // namespace with thousands of generic names or adding a .cpp for nothing.
  15. extern "C" {
  16. struct _SECURITY_ATTRIBUTES;
  17. __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
  18. __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
  19. __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
  20. __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
  21. }
  22. #elif defined(__MACH__)
  23. #include <mach/mach.h>
  24. #elif defined(__unix__)
  25. #include <semaphore.h>
  26. #if defined(__GLIBC_PREREQ) && defined(_GNU_SOURCE)
  27. #if __GLIBC_PREREQ(2,30)
  28. #define MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
  29. #endif
  30. #endif
  31. #endif
  32. namespace moodycamel
  33. {
  34. namespace details
  35. {
  36. // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
  37. // portable + lightweight semaphore implementations, originally from
  38. // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
  39. // LICENSE:
  40. // Copyright (c) 2015 Jeff Preshing
  41. //
  42. // This software is provided 'as-is', without any express or implied
  43. // warranty. In no event will the authors be held liable for any damages
  44. // arising from the use of this software.
  45. //
  46. // Permission is granted to anyone to use this software for any purpose,
  47. // including commercial applications, and to alter it and redistribute it
  48. // freely, subject to the following restrictions:
  49. //
  50. // 1. The origin of this software must not be misrepresented; you must not
  51. // claim that you wrote the original software. If you use this software
  52. // in a product, an acknowledgement in the product documentation would be
  53. // appreciated but is not required.
  54. // 2. Altered source versions must be plainly marked as such, and must not be
  55. // misrepresented as being the original software.
  56. // 3. This notice may not be removed or altered from any source distribution.
  57. #if defined(_WIN32)
  58. class Semaphore
  59. {
  60. private:
  61. void* m_hSema;
  62. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  63. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  64. public:
  65. Semaphore(int initialCount = 0)
  66. {
  67. assert(initialCount >= 0);
  68. const long maxLong = 0x7fffffff;
  69. m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
  70. assert(m_hSema);
  71. }
  72. ~Semaphore()
  73. {
  74. CloseHandle(m_hSema);
  75. }
  76. bool wait()
  77. {
  78. const unsigned long infinite = 0xffffffff;
  79. return WaitForSingleObject(m_hSema, infinite) == 0;
  80. }
  81. bool try_wait()
  82. {
  83. return WaitForSingleObject(m_hSema, 0) == 0;
  84. }
  85. bool timed_wait(std::uint64_t usecs)
  86. {
  87. return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
  88. }
  89. void signal(int count = 1)
  90. {
  91. while (!ReleaseSemaphore(m_hSema, count, nullptr));
  92. }
  93. };
  94. #elif defined(__MACH__)
  95. //---------------------------------------------------------
  96. // Semaphore (Apple iOS and OSX)
  97. // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
  98. //---------------------------------------------------------
  99. class Semaphore
  100. {
  101. private:
  102. semaphore_t m_sema;
  103. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  104. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  105. public:
  106. Semaphore(int initialCount = 0)
  107. {
  108. assert(initialCount >= 0);
  109. kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
  110. assert(rc == KERN_SUCCESS);
  111. (void)rc;
  112. }
  113. ~Semaphore()
  114. {
  115. semaphore_destroy(mach_task_self(), m_sema);
  116. }
  117. bool wait()
  118. {
  119. return semaphore_wait(m_sema) == KERN_SUCCESS;
  120. }
  121. bool try_wait()
  122. {
  123. return timed_wait(0);
  124. }
  125. bool timed_wait(std::uint64_t timeout_usecs)
  126. {
  127. mach_timespec_t ts;
  128. ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
  129. ts.tv_nsec = static_cast<int>((timeout_usecs % 1000000) * 1000);
  130. // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
  131. kern_return_t rc = semaphore_timedwait(m_sema, ts);
  132. return rc == KERN_SUCCESS;
  133. }
  134. void signal()
  135. {
  136. while (semaphore_signal(m_sema) != KERN_SUCCESS);
  137. }
  138. void signal(int count)
  139. {
  140. while (count-- > 0)
  141. {
  142. while (semaphore_signal(m_sema) != KERN_SUCCESS);
  143. }
  144. }
  145. };
  146. #elif defined(__unix__)
  147. //---------------------------------------------------------
  148. // Semaphore (POSIX, Linux)
  149. //---------------------------------------------------------
  150. class Semaphore
  151. {
  152. private:
  153. sem_t m_sema;
  154. Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  155. Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
  156. public:
  157. Semaphore(int initialCount = 0)
  158. {
  159. assert(initialCount >= 0);
  160. int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
  161. assert(rc == 0);
  162. (void)rc;
  163. }
  164. ~Semaphore()
  165. {
  166. sem_destroy(&m_sema);
  167. }
  168. bool wait()
  169. {
  170. // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
  171. int rc;
  172. do {
  173. rc = sem_wait(&m_sema);
  174. } while (rc == -1 && errno == EINTR);
  175. return rc == 0;
  176. }
  177. bool try_wait()
  178. {
  179. int rc;
  180. do {
  181. rc = sem_trywait(&m_sema);
  182. } while (rc == -1 && errno == EINTR);
  183. return rc == 0;
  184. }
  185. bool timed_wait(std::uint64_t usecs)
  186. {
  187. struct timespec ts;
  188. const int usecs_in_1_sec = 1000000;
  189. const int nsecs_in_1_sec = 1000000000;
  190. #ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
  191. clock_gettime(CLOCK_MONOTONIC, &ts);
  192. #else
  193. clock_gettime(CLOCK_REALTIME, &ts);
  194. #endif
  195. ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
  196. ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
  197. // sem_timedwait bombs if you have more than 1e9 in tv_nsec
  198. // so we have to clean things up before passing it in
  199. if (ts.tv_nsec >= nsecs_in_1_sec) {
  200. ts.tv_nsec -= nsecs_in_1_sec;
  201. ++ts.tv_sec;
  202. }
  203. int rc;
  204. do {
  205. #ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
  206. rc = sem_clockwait(&m_sema, CLOCK_MONOTONIC, &ts);
  207. #else
  208. rc = sem_timedwait(&m_sema, &ts);
  209. #endif
  210. } while (rc == -1 && errno == EINTR);
  211. return rc == 0;
  212. }
  213. void signal()
  214. {
  215. while (sem_post(&m_sema) == -1);
  216. }
  217. void signal(int count)
  218. {
  219. while (count-- > 0)
  220. {
  221. while (sem_post(&m_sema) == -1);
  222. }
  223. }
  224. };
  225. #else
  226. #error Unsupported platform! (No semaphore wrapper available)
  227. #endif
  228. } // end namespace details
  229. //---------------------------------------------------------
  230. // LightweightSemaphore
  231. //---------------------------------------------------------
  232. class LightweightSemaphore
  233. {
  234. public:
  235. typedef std::make_signed<std::size_t>::type ssize_t;
  236. private:
  237. std::atomic<ssize_t> m_count;
  238. details::Semaphore m_sema;
  239. int m_maxSpins;
  240. bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
  241. {
  242. ssize_t oldCount;
  243. int spin = m_maxSpins;
  244. while (--spin >= 0)
  245. {
  246. oldCount = m_count.load(std::memory_order_relaxed);
  247. if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
  248. return true;
  249. std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
  250. }
  251. oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
  252. if (oldCount > 0)
  253. return true;
  254. if (timeout_usecs < 0)
  255. {
  256. if (m_sema.wait())
  257. return true;
  258. }
  259. if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
  260. return true;
  261. // At this point, we've timed out waiting for the semaphore, but the
  262. // count is still decremented indicating we may still be waiting on
  263. // it. So we have to re-adjust the count, but only if the semaphore
  264. // wasn't signaled enough times for us too since then. If it was, we
  265. // need to release the semaphore too.
  266. while (true)
  267. {
  268. oldCount = m_count.load(std::memory_order_acquire);
  269. if (oldCount >= 0 && m_sema.try_wait())
  270. return true;
  271. if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
  272. return false;
  273. }
  274. }
  275. ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
  276. {
  277. assert(max > 0);
  278. ssize_t oldCount;
  279. int spin = m_maxSpins;
  280. while (--spin >= 0)
  281. {
  282. oldCount = m_count.load(std::memory_order_relaxed);
  283. if (oldCount > 0)
  284. {
  285. ssize_t newCount = oldCount > max ? oldCount - max : 0;
  286. if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
  287. return oldCount - newCount;
  288. }
  289. std::atomic_signal_fence(std::memory_order_acquire);
  290. }
  291. oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
  292. if (oldCount <= 0)
  293. {
  294. if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs)))
  295. {
  296. while (true)
  297. {
  298. oldCount = m_count.load(std::memory_order_acquire);
  299. if (oldCount >= 0 && m_sema.try_wait())
  300. break;
  301. if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
  302. return 0;
  303. }
  304. }
  305. }
  306. if (max > 1)
  307. return 1 + tryWaitMany(max - 1);
  308. return 1;
  309. }
  310. public:
  311. LightweightSemaphore(ssize_t initialCount = 0, int maxSpins = 10000) : m_count(initialCount), m_maxSpins(maxSpins)
  312. {
  313. assert(initialCount >= 0);
  314. assert(maxSpins >= 0);
  315. }
  316. bool tryWait()
  317. {
  318. ssize_t oldCount = m_count.load(std::memory_order_relaxed);
  319. while (oldCount > 0)
  320. {
  321. if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
  322. return true;
  323. }
  324. return false;
  325. }
  326. bool wait()
  327. {
  328. return tryWait() || waitWithPartialSpinning();
  329. }
  330. bool wait(std::int64_t timeout_usecs)
  331. {
  332. return tryWait() || waitWithPartialSpinning(timeout_usecs);
  333. }
  334. // Acquires between 0 and (greedily) max, inclusive
  335. ssize_t tryWaitMany(ssize_t max)
  336. {
  337. assert(max >= 0);
  338. ssize_t oldCount = m_count.load(std::memory_order_relaxed);
  339. while (oldCount > 0)
  340. {
  341. ssize_t newCount = oldCount > max ? oldCount - max : 0;
  342. if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
  343. return oldCount - newCount;
  344. }
  345. return 0;
  346. }
  347. // Acquires at least one, and (greedily) at most max
  348. ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
  349. {
  350. assert(max >= 0);
  351. ssize_t result = tryWaitMany(max);
  352. if (result == 0 && max > 0)
  353. result = waitManyWithPartialSpinning(max, timeout_usecs);
  354. return result;
  355. }
  356. ssize_t waitMany(ssize_t max)
  357. {
  358. ssize_t result = waitMany(max, -1);
  359. assert(result > 0);
  360. return result;
  361. }
  362. void signal(ssize_t count = 1)
  363. {
  364. assert(count >= 0);
  365. ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
  366. ssize_t toRelease = -oldCount < count ? -oldCount : count;
  367. if (toRelease > 0)
  368. {
  369. m_sema.signal((int)toRelease);
  370. }
  371. }
  372. std::size_t availableApprox() const
  373. {
  374. ssize_t count = m_count.load(std::memory_order_relaxed);
  375. return count > 0 ? static_cast<std::size_t>(count) : 0;
  376. }
  377. };
  378. } // end namespace moodycamel