TurnSequencer.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. * Copyright 2015-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <algorithm>
  18. #include <limits>
  19. #include <folly/detail/Futex.h>
  20. #include <folly/portability/Asm.h>
  21. #include <folly/portability/Unistd.h>
  22. #include <glog/logging.h>
  23. namespace folly {
  24. namespace detail {
  25. /// A TurnSequencer allows threads to order their execution according to
  26. /// a monotonically increasing (with wraparound) "turn" value. The two
  27. /// operations provided are to wait for turn T, and to move to the next
  28. /// turn. Every thread that is waiting for T must have arrived before
  29. /// that turn is marked completed (for MPMCQueue only one thread waits
  30. /// for any particular turn, so this is trivially true).
  31. ///
  32. /// TurnSequencer's state_ holds 26 bits of the current turn (shifted
  33. /// left by 6), along with a 6 bit saturating value that records the
  34. /// maximum waiter minus the current turn. Wraparound of the turn space
  35. /// is expected and handled. This allows us to atomically adjust the
  36. /// number of outstanding waiters when we perform a FUTEX_WAKE operation.
  37. /// Compare this strategy to sem_t's separate num_waiters field, which
  38. /// isn't decremented until after the waiting thread gets scheduled,
  39. /// during which time more enqueues might have occurred and made pointless
  40. /// FUTEX_WAKE calls.
  41. ///
  42. /// TurnSequencer uses futex() directly. It is optimized for the
  43. /// case that the highest awaited turn is 32 or less higher than the
  44. /// current turn. We use the FUTEX_WAIT_BITSET variant, which lets
  45. /// us embed 32 separate wakeup channels in a single futex. See
  46. /// http://locklessinc.com/articles/futex_cheat_sheet for a description.
  47. ///
  48. /// We only need to keep exact track of the delta between the current
  49. /// turn and the maximum waiter for the 32 turns that follow the current
  50. /// one, because waiters at turn t+32 will be awoken at turn t. At that
  51. /// point they can then adjust the delta using the higher base. Since we
  52. /// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits.
  53. /// We actually store waiter deltas up to 63, since that might reduce
  54. /// the number of CAS operations a tiny bit.
  55. ///
  56. /// To avoid some futex() calls entirely, TurnSequencer uses an adaptive
  57. /// spin cutoff before waiting. The overheads (and convergence rate)
  58. /// of separately tracking the spin cutoff for each TurnSequencer would
  59. /// be prohibitive, so the actual storage is passed in as a parameter and
  60. /// updated atomically. This also lets the caller use different adaptive
  61. /// cutoffs for different operations (read versus write, for example).
  62. /// To avoid contention, the spin cutoff is only updated when requested
  63. /// by the caller.
  64. template <template <typename> class Atom>
  65. struct TurnSequencer {
  66. explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept
  67. : state_(encode(firstTurn << kTurnShift, 0)) {}
  68. /// Returns true iff a call to waitForTurn(turn, ...) won't block
  69. bool isTurn(const uint32_t turn) const noexcept {
  70. auto state = state_.load(std::memory_order_acquire);
  71. return decodeCurrentSturn(state) == (turn << kTurnShift);
  72. }
  73. enum class TryWaitResult { SUCCESS, PAST, TIMEDOUT };
  74. /// See tryWaitForTurn
  75. /// Requires that `turn` is not a turn in the past.
  76. void waitForTurn(
  77. const uint32_t turn,
  78. Atom<uint32_t>& spinCutoff,
  79. const bool updateSpinCutoff) noexcept {
  80. const auto ret = tryWaitForTurn(turn, spinCutoff, updateSpinCutoff);
  81. DCHECK(ret == TryWaitResult::SUCCESS);
  82. }
  83. // Internally we always work with shifted turn values, which makes the
  84. // truncation and wraparound work correctly. This leaves us bits at
  85. // the bottom to store the number of waiters. We call shifted turns
  86. // "sturns" inside this class.
  87. /// Blocks the current thread until turn has arrived. If
  88. /// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
  89. /// before blocking and will adjust spinCutoff based on the results,
  90. /// otherwise it will spin for at most spinCutoff spins.
  91. /// Returns SUCCESS if the wait succeeded, PAST if the turn is in the past
  92. /// or TIMEDOUT if the absTime time value is not nullptr and is reached before
  93. /// the turn arrives
  94. template <
  95. class Clock = std::chrono::steady_clock,
  96. class Duration = typename Clock::duration>
  97. TryWaitResult tryWaitForTurn(
  98. const uint32_t turn,
  99. Atom<uint32_t>& spinCutoff,
  100. const bool updateSpinCutoff,
  101. const std::chrono::time_point<Clock, Duration>* absTime =
  102. nullptr) noexcept {
  103. uint32_t prevThresh = spinCutoff.load(std::memory_order_relaxed);
  104. const uint32_t effectiveSpinCutoff =
  105. updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
  106. uint32_t tries;
  107. const uint32_t sturn = turn << kTurnShift;
  108. for (tries = 0;; ++tries) {
  109. uint32_t state = state_.load(std::memory_order_acquire);
  110. uint32_t current_sturn = decodeCurrentSturn(state);
  111. if (current_sturn == sturn) {
  112. break;
  113. }
  114. // wrap-safe version of (current_sturn >= sturn)
  115. if (sturn - current_sturn >= std::numeric_limits<uint32_t>::max() / 2) {
  116. // turn is in the past
  117. return TryWaitResult::PAST;
  118. }
  119. // the first effectSpinCutoff tries are spins, after that we will
  120. // record ourself as a waiter and block with futexWait
  121. if (tries < effectiveSpinCutoff) {
  122. asm_volatile_pause();
  123. continue;
  124. }
  125. uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
  126. uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
  127. uint32_t new_state;
  128. if (our_waiter_delta <= current_max_waiter_delta) {
  129. // state already records us as waiters, probably because this
  130. // isn't our first time around this loop
  131. new_state = state;
  132. } else {
  133. new_state = encode(current_sturn, our_waiter_delta);
  134. if (state != new_state &&
  135. !state_.compare_exchange_strong(state, new_state)) {
  136. continue;
  137. }
  138. }
  139. if (absTime) {
  140. auto futexResult = detail::futexWaitUntil(
  141. &state_, new_state, *absTime, futexChannel(turn));
  142. if (futexResult == FutexResult::TIMEDOUT) {
  143. return TryWaitResult::TIMEDOUT;
  144. }
  145. } else {
  146. detail::futexWait(&state_, new_state, futexChannel(turn));
  147. }
  148. }
  149. if (updateSpinCutoff || prevThresh == 0) {
  150. // if we hit kMaxSpins then spinning was pointless, so the right
  151. // spinCutoff is kMinSpins
  152. uint32_t target;
  153. if (tries >= kMaxSpins) {
  154. target = kMinSpins;
  155. } else {
  156. // to account for variations, we allow ourself to spin 2*N when
  157. // we think that N is actually required in order to succeed
  158. target = std::min<uint32_t>(
  159. kMaxSpins, std::max<uint32_t>(kMinSpins, tries * 2));
  160. }
  161. if (prevThresh == 0) {
  162. // bootstrap
  163. spinCutoff.store(target);
  164. } else {
  165. // try once, keep moving if CAS fails. Exponential moving average
  166. // with alpha of 7/8
  167. // Be careful that the quantity we add to prevThresh is signed.
  168. spinCutoff.compare_exchange_weak(
  169. prevThresh, prevThresh + int(target - prevThresh) / 8);
  170. }
  171. }
  172. return TryWaitResult::SUCCESS;
  173. }
  174. /// Unblocks a thread running waitForTurn(turn + 1)
  175. void completeTurn(const uint32_t turn) noexcept {
  176. uint32_t state = state_.load(std::memory_order_acquire);
  177. while (true) {
  178. DCHECK(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
  179. uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
  180. uint32_t new_state = encode(
  181. (turn + 1) << kTurnShift,
  182. max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
  183. if (state_.compare_exchange_strong(state, new_state)) {
  184. if (max_waiter_delta != 0) {
  185. detail::futexWake(
  186. &state_, std::numeric_limits<int>::max(), futexChannel(turn + 1));
  187. }
  188. break;
  189. }
  190. // failing compare_exchange_strong updates first arg to the value
  191. // that caused the failure, so no need to reread state_
  192. }
  193. }
  194. /// Returns the least-most significant byte of the current uncompleted
  195. /// turn. The full 32 bit turn cannot be recovered.
  196. uint8_t uncompletedTurnLSB() const noexcept {
  197. return uint8_t(state_.load(std::memory_order_acquire) >> kTurnShift);
  198. }
  199. private:
  200. enum : uint32_t {
  201. /// kTurnShift counts the bits that are stolen to record the delta
  202. /// between the current turn and the maximum waiter. It needs to be big
  203. /// enough to record wait deltas of 0 to 32 inclusive. Waiters more
  204. /// than 32 in the future will be woken up 32*n turns early (since
  205. /// their BITSET will hit) and will adjust the waiter count again.
  206. /// We go a bit beyond and let the waiter count go up to 63, which
  207. /// is free and might save us a few CAS
  208. kTurnShift = 6,
  209. kWaitersMask = (1 << kTurnShift) - 1,
  210. /// The minimum spin count that we will adaptively select
  211. kMinSpins = 20,
  212. /// The maximum spin count that we will adaptively select, and the
  213. /// spin count that will be used when probing to get a new data point
  214. /// for the adaptation
  215. kMaxSpins = 2000,
  216. };
  217. /// This holds both the current turn, and the highest waiting turn,
  218. /// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn))
  219. Futex<Atom> state_;
  220. /// Returns the bitmask to pass futexWait or futexWake when communicating
  221. /// about the specified turn
  222. uint32_t futexChannel(uint32_t turn) const noexcept {
  223. return 1u << (turn & 31);
  224. }
  225. uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
  226. return state & ~kWaitersMask;
  227. }
  228. uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept {
  229. return state & kWaitersMask;
  230. }
  231. uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
  232. return currentSturn | std::min(uint32_t{kWaitersMask}, maxWaiterD);
  233. }
  234. };
  235. } // namespace detail
  236. } // namespace folly