SaturatingSemaphore.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /*
  2. * Copyright 2017-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <folly/Likely.h>
  18. #include <folly/detail/Futex.h>
  19. #include <folly/detail/MemoryIdler.h>
  20. #include <folly/portability/Asm.h>
  21. #include <folly/synchronization/WaitOptions.h>
  22. #include <folly/synchronization/detail/Spin.h>
  23. #include <glog/logging.h>
  24. #include <atomic>
  25. namespace folly {
  26. /// SaturatingSemaphore is a flag that allows concurrent posting by
  27. /// multiple posters and concurrent non-destructive waiting by
  28. /// multiple waiters.
  29. ///
  30. /// A SaturatingSemaphore allows one or more waiter threads to check,
  31. /// spin, or block, indefinitely or with timeout, for a flag to be set
  32. /// by one or more poster threads. By setting the flag, posters
  33. /// announce to waiters (that may be already waiting or will check
  34. /// the flag in the future) that some condition is true. Posts to an
  35. /// already set flag are idempotent.
  36. ///
  37. /// SaturatingSemaphore is called so because it behaves like a hybrid
  38. /// binary/counted _semaphore_ with values zero and infinity, and
  39. /// post() and wait() functions. It is called _saturating_ because one
  40. /// post() is enough to set it to infinity and to satisfy any number
  41. /// of wait()-s. Once set (to infinity) it remains unchanged by
  42. /// subsequent post()-s and wait()-s, until it is reset() back to
  43. /// zero.
  44. ///
  45. /// The implementation of SaturatingSemaphore is based on that of
  46. /// Baton. It includes no internal padding, and is only 4 bytes in
  47. /// size. Any alignment or padding to avoid false sharing is up to
  48. /// the user.
  49. /// SaturatingSemaphore differs from Baton as follows:
  50. /// - Baton allows at most one call to post(); this allows any number
  51. /// and concurrently.
  52. /// - Baton allows at most one successful call to any wait variant;
  53. /// this allows any number and concurrently.
  54. ///
  55. /// Template parameter:
  56. /// - bool MayBlock: If false, waiting operations spin only. If
  57. /// true, timed and wait operations may block; adds an atomic
  58. /// instruction to the critical path of posters.
  59. ///
  60. /// Wait options:
  61. /// WaitOptions contains optional per call setting for spin-max duration:
  62. /// Calls to wait(), try_wait_until(), and try_wait_for() block only after the
  63. /// passage of the spin-max period. The default spin-max duration is 10 usec.
  64. /// The spin-max option is applicable only if MayBlock is true.
  65. ///
  66. /// Functions:
  67. /// bool ready():
  68. /// Returns true if the flag is set by a call to post, otherwise false.
  69. /// Equivalent to try_wait, but available on const receivers.
  70. /// void reset();
  71. /// Clears the flag.
  72. /// void post();
  73. /// Sets the flag and wakes all current waiters, i.e., causes all
  74. /// concurrent calls to wait, try_wait_for, and try_wait_until to
  75. /// return.
  76. /// void wait(
  77. /// WaitOptions opt = wait_options());
  78. /// Waits for the flag to be set by a call to post.
  79. /// bool try_wait();
  80. /// Returns true if the flag is set by a call to post, otherwise false.
  81. /// bool try_wait_until(
  82. /// time_point& deadline,
  83. /// WaitOptions& = wait_options());
  84. /// Returns true if the flag is set by a call to post before the
  85. /// deadline, otherwise false.
  86. /// bool try_wait_for(
  87. /// duration&,
  88. /// WaitOptions& = wait_options());
  89. /// Returns true if the flag is set by a call to post before the
  90. /// expiration of the specified duration, otherwise false.
  91. ///
  92. /// Usage:
  93. /// @code
  94. /// SaturatingSemaphore</* MayBlock = */ true> f;
  95. /// ASSERT_FALSE(f.try_wait());
  96. /// ASSERT_FALSE(f.try_wait_until(
  97. /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
  98. /// ASSERT_FALSE(f.try_wait_until(
  99. /// std::chrono::steady_clock::now() + std::chrono::microseconds(1),
  100. /// f.wait_options().spin_max(std::chrono::microseconds(1))));
  101. /// f.post();
  102. /// f.post();
  103. /// f.wait();
  104. /// f.wait(f.wait_options().spin_max(std::chrono::nanoseconds(100)));
  105. /// ASSERT_TRUE(f.try_wait());
  106. /// ASSERT_TRUE(f.try_wait_until(
  107. /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
  108. /// f.wait();
  109. /// f.reset();
  110. /// ASSERT_FALSE(f.try_wait());
  111. /// @endcode
  112. template <bool MayBlock, template <typename> class Atom = std::atomic>
  113. class SaturatingSemaphore {
  114. detail::Futex<Atom> state_;
  115. enum State : uint32_t {
  116. NOTREADY = 0,
  117. READY = 1,
  118. BLOCKED = 2,
  119. };
  120. public:
  121. FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
  122. return {};
  123. }
  124. /** constructor */
  125. constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
  126. /** destructor */
  127. ~SaturatingSemaphore() {}
  128. /** ready */
  129. FOLLY_ALWAYS_INLINE bool ready() const noexcept {
  130. return state_.load(std::memory_order_acquire) == READY;
  131. }
  132. /** reset */
  133. void reset() noexcept {
  134. state_.store(NOTREADY, std::memory_order_relaxed);
  135. }
  136. /** post */
  137. FOLLY_ALWAYS_INLINE void post() noexcept {
  138. if (!MayBlock) {
  139. state_.store(READY, std::memory_order_release);
  140. } else {
  141. postFastWaiterMayBlock();
  142. }
  143. }
  144. /** wait */
  145. FOLLY_ALWAYS_INLINE
  146. void wait(const WaitOptions& opt = wait_options()) noexcept {
  147. try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
  148. }
  149. /** try_wait */
  150. FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
  151. return ready();
  152. }
  153. /** try_wait_until */
  154. template <typename Clock, typename Duration>
  155. FOLLY_ALWAYS_INLINE bool try_wait_until(
  156. const std::chrono::time_point<Clock, Duration>& deadline,
  157. const WaitOptions& opt = wait_options()) noexcept {
  158. if (LIKELY(try_wait())) {
  159. return true;
  160. }
  161. return tryWaitSlow(deadline, opt);
  162. }
  163. /** try_wait_for */
  164. template <class Rep, class Period>
  165. FOLLY_ALWAYS_INLINE bool try_wait_for(
  166. const std::chrono::duration<Rep, Period>& duration,
  167. const WaitOptions& opt = wait_options()) noexcept {
  168. if (LIKELY(try_wait())) {
  169. return true;
  170. }
  171. auto deadline = std::chrono::steady_clock::now() + duration;
  172. return tryWaitSlow(deadline, opt);
  173. }
  174. private:
  175. FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
  176. uint32_t before = NOTREADY;
  177. if (LIKELY(state_.compare_exchange_strong(
  178. before,
  179. READY,
  180. std::memory_order_release,
  181. std::memory_order_relaxed))) {
  182. return;
  183. }
  184. postSlowWaiterMayBlock(before);
  185. }
  186. void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
  187. template <typename Clock, typename Duration>
  188. bool tryWaitSlow(
  189. const std::chrono::time_point<Clock, Duration>& deadline,
  190. const WaitOptions& opt) noexcept; // defined below
  191. };
  192. ///
  193. /// Member function definitioons
  194. ///
  195. /** postSlowWaiterMayBlock */
  196. template <bool MayBlock, template <typename> class Atom>
  197. FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
  198. uint32_t before) noexcept {
  199. while (true) {
  200. if (before == NOTREADY) {
  201. if (state_.compare_exchange_strong(
  202. before,
  203. READY,
  204. std::memory_order_release,
  205. std::memory_order_relaxed)) {
  206. return;
  207. }
  208. }
  209. if (before == READY) { // Only if multiple posters
  210. // The reason for not simply returning (without the following
  211. // steps) is to prevent the following case:
  212. //
  213. // T1: T2: T3:
  214. // local1.post(); local2.post(); global.wait();
  215. // global.post(); global.post(); global.reset();
  216. // seq_cst fence
  217. // local1.try_wait() == true;
  218. // local2.try_wait() == false;
  219. //
  220. // This following steps correspond to T2's global.post(), where
  221. // global is already posted by T1.
  222. //
  223. // The following fence and load guarantee that T3 does not miss
  224. // T2's prior stores, i.e., local2.post() in this example.
  225. //
  226. // The following case is prevented:
  227. //
  228. // Starting with local2 == NOTREADY and global == READY
  229. //
  230. // T2: T3:
  231. // store READY to local2 // post store NOTREADY to global // reset
  232. // seq_cst fenc seq_cst fence
  233. // load READY from global // post load NOTREADY from local2 // try_wait
  234. //
  235. std::atomic_thread_fence(std::memory_order_seq_cst);
  236. before = state_.load(std::memory_order_relaxed);
  237. if (before == READY) {
  238. return;
  239. }
  240. continue;
  241. }
  242. DCHECK_EQ(before, BLOCKED);
  243. if (state_.compare_exchange_strong(
  244. before,
  245. READY,
  246. std::memory_order_release,
  247. std::memory_order_relaxed)) {
  248. detail::futexWake(&state_);
  249. return;
  250. }
  251. }
  252. }
  253. /** tryWaitSlow */
  254. template <bool MayBlock, template <typename> class Atom>
  255. template <typename Clock, typename Duration>
  256. FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
  257. const std::chrono::time_point<Clock, Duration>& deadline,
  258. const WaitOptions& opt) noexcept {
  259. switch (detail::spin_pause_until(deadline, opt, [=] { return ready(); })) {
  260. case detail::spin_result::success:
  261. return true;
  262. case detail::spin_result::timeout:
  263. return false;
  264. case detail::spin_result::advance:
  265. break;
  266. }
  267. if (!MayBlock) {
  268. switch (detail::spin_yield_until(deadline, [=] { return ready(); })) {
  269. case detail::spin_result::success:
  270. return true;
  271. case detail::spin_result::timeout:
  272. return false;
  273. case detail::spin_result::advance:
  274. break;
  275. }
  276. }
  277. auto before = state_.load(std::memory_order_relaxed);
  278. while (before == NOTREADY &&
  279. !state_.compare_exchange_strong(
  280. before,
  281. BLOCKED,
  282. std::memory_order_relaxed,
  283. std::memory_order_relaxed)) {
  284. if (before == READY) {
  285. // TODO: move the acquire to the compare_exchange failure load after C++17
  286. std::atomic_thread_fence(std::memory_order_acquire);
  287. return true;
  288. }
  289. }
  290. while (true) {
  291. auto rv = detail::MemoryIdler::futexWaitUntil(state_, BLOCKED, deadline);
  292. if (rv == detail::FutexResult::TIMEDOUT) {
  293. assert(deadline != (std::chrono::time_point<Clock, Duration>::max()));
  294. return false;
  295. }
  296. if (ready()) {
  297. return true;
  298. }
  299. }
  300. }
  301. } // namespace folly