LifoSem.h 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <algorithm>
  18. #include <atomic>
  19. #include <cstdint>
  20. #include <cstring>
  21. #include <memory>
  22. #include <system_error>
  23. #include <folly/CPortability.h>
  24. #include <folly/CachelinePadded.h>
  25. #include <folly/IndexedMemPool.h>
  26. #include <folly/Likely.h>
  27. #include <folly/lang/SafeAssert.h>
  28. #include <folly/synchronization/AtomicStruct.h>
  29. #include <folly/synchronization/SaturatingSemaphore.h>
  30. namespace folly {
  31. template <
  32. template <typename> class Atom = std::atomic,
  33. class BatonType = SaturatingSemaphore<true, Atom>>
  34. struct LifoSemImpl;
  35. /// LifoSem is a semaphore that wakes its waiters in a manner intended to
  36. /// maximize performance rather than fairness. It should be preferred
  37. /// to a mutex+condvar or POSIX sem_t solution when all of the waiters
  38. /// are equivalent. It is faster than a condvar or sem_t, and it has a
  39. /// shutdown state that might save you a lot of complexity when it comes
  40. /// time to shut down your work pipelines. LifoSem is larger than sem_t,
  41. /// but that is only because it uses padding and alignment to avoid
  42. /// false sharing.
  43. ///
  44. /// LifoSem allows multi-post and multi-tryWait, and provides a shutdown
  45. /// state that awakens all waiters. LifoSem is faster than sem_t because
  46. /// it performs exact wakeups, so it often requires fewer system calls.
  47. /// It provides all of the functionality of sem_t except for timed waiting.
  48. /// It is called LifoSem because its wakeup policy is approximately LIFO,
  49. /// rather than the usual FIFO.
  50. ///
  51. /// The core semaphore operations provided are:
  52. ///
  53. /// -- post() -- if there is a pending waiter, wake it up, otherwise
  54. /// increment the value of the semaphore. If the value of the semaphore
  55. /// is already 2^32-1, does nothing. Compare to sem_post().
  56. ///
  57. /// -- post(n) -- equivalent to n calls to post(), but much more efficient.
  58. /// sem_t has no equivalent to this method.
  59. ///
  60. /// -- bool tryWait() -- if the semaphore's value is positive, decrements it
  61. /// and returns true, otherwise returns false. Compare to sem_trywait().
  62. ///
  63. /// -- uint32_t tryWait(uint32_t n) -- attempts to decrement the semaphore's
  64. /// value by n, returning the amount by which it actually was decremented
  65. /// (a value from 0 to n inclusive). Not atomic. Equivalent to n calls
  66. /// to tryWait(). sem_t has no equivalent to this method.
  67. ///
  68. /// -- wait() -- waits until tryWait() can succeed. Compare to sem_wait().
  69. ///
  70. /// -- timed wait variants - will wait until timeout. Note when these
  71. /// timeout, the current implementation takes a lock, blocking
  72. /// concurrent pushes and pops. (If timed wait calls are
  73. /// substantial, consider re-working this code to be lock-free).
  74. ///
  75. /// LifoSem also has the notion of a shutdown state, in which any calls
  76. /// that would block (or are already blocked) throw ShutdownSemError.
  77. /// Note the difference between a call to wait() and a call to wait()
  78. /// that might block. In the former case tryWait() would succeed, and no
  79. /// isShutdown() check is performed. In the latter case an exception is
  80. /// thrown. This behavior allows a LifoSem controlling work distribution
  81. /// to drain. If you want to immediately stop all waiting on shutdown,
  82. /// you can just check isShutdown() yourself (preferrably wrapped in
  83. /// an UNLIKELY). This fast-stop behavior is easy to add, but difficult
  84. /// to remove if you want the draining behavior, which is why we have
  85. /// chosen the former.
  86. ///
  87. /// All LifoSem operations except valueGuess() are guaranteed to be
  88. /// linearizable.
  89. typedef LifoSemImpl<> LifoSem;
  90. /// The exception thrown when wait()ing on an isShutdown() LifoSem
  91. struct FOLLY_EXPORT ShutdownSemError : public std::runtime_error {
  92. explicit ShutdownSemError(const std::string& msg);
  93. ~ShutdownSemError() noexcept override;
  94. };
  95. namespace detail {
  96. // Internally, a LifoSem is either a value or a linked list of wait nodes.
  97. // This union is captured in the LifoSemHead type, which holds either a
  98. // value or an indexed pointer to the list. LifoSemHead itself is a value
  99. // type, the head is a mutable atomic box containing a LifoSemHead value.
  100. // Each wait node corresponds to exactly one waiter. Values can flow
  101. // through the semaphore either by going into and out of the head's value,
  102. // or by direct communication from a poster to a waiter. The former path
  103. // is taken when there are no pending waiters, the latter otherwise. The
  104. // general flow of a post is to try to increment the value or pop-and-post
  105. // a wait node. Either of those have the effect of conveying one semaphore
  106. // unit. Waiting is the opposite, either a decrement of the value or
  107. // push-and-wait of a wait node. The generic LifoSemBase abstracts the
  108. // actual mechanism by which a wait node's post->wait communication is
  109. // performed, which is why we have LifoSemRawNode and LifoSemNode.
  110. /// LifoSemRawNode is the actual pooled storage that backs LifoSemNode
  111. /// for user-specified Handoff types. This is done so that we can have
  112. /// a large static IndexedMemPool of nodes, instead of per-type pools
  113. template <template <typename> class Atom>
  114. struct LifoSemRawNode {
  115. std::aligned_storage<sizeof(void*), alignof(void*)>::type raw;
  116. /// The IndexedMemPool index of the next node in this chain, or 0
  117. /// if none. This will be set to uint32_t(-1) if the node is being
  118. /// posted due to a shutdown-induced wakeup
  119. uint32_t next;
  120. bool isShutdownNotice() const {
  121. return next == uint32_t(-1);
  122. }
  123. void clearShutdownNotice() {
  124. next = 0;
  125. }
  126. void setShutdownNotice() {
  127. next = uint32_t(-1);
  128. }
  129. typedef folly::IndexedMemPool<LifoSemRawNode<Atom>, 32, 200, Atom> Pool;
  130. /// Storage for all of the waiter nodes for LifoSem-s that use Atom
  131. static Pool& pool();
  132. };
  133. /// Use this macro to declare the static storage that backs the raw nodes
  134. /// for the specified atomic type
  135. #define LIFOSEM_DECLARE_POOL(Atom, capacity) \
  136. namespace folly { \
  137. namespace detail { \
  138. template <> \
  139. LifoSemRawNode<Atom>::Pool& LifoSemRawNode<Atom>::pool() { \
  140. static Pool* instance = new Pool((capacity)); \
  141. return *instance; \
  142. } \
  143. } \
  144. }
  145. /// Handoff is a type not bigger than a void* that knows how to perform a
  146. /// single post() -> wait() communication. It must have a post() method.
  147. /// If it has a wait() method then LifoSemBase's wait() implementation
  148. /// will work out of the box, otherwise you will need to specialize
  149. /// LifoSemBase::wait accordingly.
  150. template <typename Handoff, template <typename> class Atom>
  151. struct LifoSemNode : public LifoSemRawNode<Atom> {
  152. static_assert(
  153. sizeof(Handoff) <= sizeof(LifoSemRawNode<Atom>::raw),
  154. "Handoff too big for small-object optimization, use indirection");
  155. static_assert(
  156. alignof(Handoff) <= alignof(decltype(LifoSemRawNode<Atom>::raw)),
  157. "Handoff alignment constraint not satisfied");
  158. template <typename... Args>
  159. void init(Args&&... args) {
  160. new (&this->raw) Handoff(std::forward<Args>(args)...);
  161. }
  162. void destroy() {
  163. handoff().~Handoff();
  164. #ifndef NDEBUG
  165. memset(&this->raw, 'F', sizeof(this->raw));
  166. #endif
  167. }
  168. Handoff& handoff() {
  169. return *static_cast<Handoff*>(static_cast<void*>(&this->raw));
  170. }
  171. const Handoff& handoff() const {
  172. return *static_cast<const Handoff*>(static_cast<const void*>(&this->raw));
  173. }
  174. };
  175. template <typename Handoff, template <typename> class Atom>
  176. struct LifoSemNodeRecycler {
  177. void operator()(LifoSemNode<Handoff, Atom>* elem) const {
  178. elem->destroy();
  179. auto idx = LifoSemRawNode<Atom>::pool().locateElem(elem);
  180. LifoSemRawNode<Atom>::pool().recycleIndex(idx);
  181. }
  182. };
  183. /// LifoSemHead is a 64-bit struct that holds a 32-bit value, some state
  184. /// bits, and a sequence number used to avoid ABA problems in the lock-free
  185. /// management of the LifoSem's wait lists. The value can either hold
  186. /// an integral semaphore value (if there are no waiters) or a node index
  187. /// (see IndexedMemPool) for the head of a list of wait nodes
  188. class LifoSemHead {
  189. // What we really want are bitfields:
  190. // uint64_t data : 32; uint64_t isNodeIdx : 1; uint64_t seq : 31;
  191. // Unfortunately g++ generates pretty bad code for this sometimes (I saw
  192. // -O3 code from gcc 4.7.1 copying the bitfields one at a time instead of
  193. // in bulk, for example). We can generate better code anyway by assuming
  194. // that setters won't be given values that cause under/overflow, and
  195. // putting the sequence at the end where its planned overflow doesn't
  196. // need any masking.
  197. //
  198. // data == 0 (empty list) with isNodeIdx is conceptually the same
  199. // as data == 0 (no unclaimed increments) with !isNodeIdx, we always
  200. // convert the former into the latter to make the logic simpler.
  201. enum {
  202. IsNodeIdxShift = 32,
  203. IsShutdownShift = 33,
  204. IsLockedShift = 34,
  205. SeqShift = 35,
  206. };
  207. enum : uint64_t {
  208. IsNodeIdxMask = uint64_t(1) << IsNodeIdxShift,
  209. IsShutdownMask = uint64_t(1) << IsShutdownShift,
  210. IsLockedMask = uint64_t(1) << IsLockedShift,
  211. SeqIncr = uint64_t(1) << SeqShift,
  212. SeqMask = ~(SeqIncr - 1),
  213. };
  214. public:
  215. uint64_t bits;
  216. //////// getters
  217. inline uint32_t idx() const {
  218. assert(isNodeIdx());
  219. assert(uint32_t(bits) != 0);
  220. return uint32_t(bits);
  221. }
  222. inline uint32_t value() const {
  223. assert(!isNodeIdx());
  224. return uint32_t(bits);
  225. }
  226. inline constexpr bool isNodeIdx() const {
  227. return (bits & IsNodeIdxMask) != 0;
  228. }
  229. inline constexpr bool isShutdown() const {
  230. return (bits & IsShutdownMask) != 0;
  231. }
  232. inline constexpr bool isLocked() const {
  233. return (bits & IsLockedMask) != 0;
  234. }
  235. inline constexpr uint32_t seq() const {
  236. return uint32_t(bits >> SeqShift);
  237. }
  238. //////// setter-like things return a new struct
  239. /// This should only be used for initial construction, not for setting
  240. /// the value, because it clears the sequence number
  241. static inline constexpr LifoSemHead fresh(uint32_t value) {
  242. return LifoSemHead{value};
  243. }
  244. /// Returns the LifoSemHead that results from popping a waiter node,
  245. /// given the current waiter node's next ptr
  246. inline LifoSemHead withPop(uint32_t idxNext) const {
  247. assert(!isLocked());
  248. assert(isNodeIdx());
  249. if (idxNext == 0) {
  250. // no isNodeIdx bit or data bits. Wraparound of seq bits is okay
  251. return LifoSemHead{(bits & (SeqMask | IsShutdownMask)) + SeqIncr};
  252. } else {
  253. // preserve sequence bits (incremented with wraparound okay) and
  254. // isNodeIdx bit, replace all data bits
  255. return LifoSemHead{(bits & (SeqMask | IsShutdownMask | IsNodeIdxMask)) +
  256. SeqIncr + idxNext};
  257. }
  258. }
  259. /// Returns the LifoSemHead that results from pushing a new waiter node
  260. inline LifoSemHead withPush(uint32_t _idx) const {
  261. assert(!isLocked());
  262. assert(isNodeIdx() || value() == 0);
  263. assert(!isShutdown());
  264. assert(_idx != 0);
  265. return LifoSemHead{(bits & SeqMask) | IsNodeIdxMask | _idx};
  266. }
  267. /// Returns the LifoSemHead with value increased by delta, with
  268. /// saturation if the maximum value is reached
  269. inline LifoSemHead withValueIncr(uint32_t delta) const {
  270. assert(!isLocked());
  271. assert(!isNodeIdx());
  272. auto rv = LifoSemHead{bits + SeqIncr + delta};
  273. if (UNLIKELY(rv.isNodeIdx())) {
  274. // value has overflowed into the isNodeIdx bit
  275. rv = LifoSemHead{(rv.bits & ~IsNodeIdxMask) | (IsNodeIdxMask - 1)};
  276. }
  277. return rv;
  278. }
  279. /// Returns the LifoSemHead that results from decrementing the value
  280. inline LifoSemHead withValueDecr(uint32_t delta) const {
  281. assert(!isLocked());
  282. assert(delta > 0 && delta <= value());
  283. return LifoSemHead{bits + SeqIncr - delta};
  284. }
  285. /// Returns the LifoSemHead with the same state as the current node,
  286. /// but with the shutdown bit set
  287. inline LifoSemHead withShutdown() const {
  288. return LifoSemHead{bits | IsShutdownMask};
  289. }
  290. // Returns LifoSemHead with lock bit set, but rest of bits unchanged.
  291. inline LifoSemHead withLock() const {
  292. assert(!isLocked());
  293. return LifoSemHead{bits | IsLockedMask};
  294. }
  295. // Returns LifoSemHead with lock bit unset, and updated seqno based
  296. // on idx.
  297. inline LifoSemHead withoutLock(uint32_t idxNext) const {
  298. assert(isLocked());
  299. // We need to treat this as a pop, as we may change the list head.
  300. return LifoSemHead{bits & ~IsLockedMask}.withPop(idxNext);
  301. }
  302. inline constexpr bool operator==(const LifoSemHead& rhs) const {
  303. return bits == rhs.bits;
  304. }
  305. inline constexpr bool operator!=(const LifoSemHead& rhs) const {
  306. return !(*this == rhs);
  307. }
  308. };
  309. /// LifoSemBase is the engine for several different types of LIFO
  310. /// semaphore. LifoSemBase handles storage of positive semaphore values
  311. /// and wait nodes, but the actual waiting and notification mechanism is
  312. /// up to the client.
  313. ///
  314. /// The Handoff type is responsible for arranging one wakeup notification.
  315. /// See LifoSemNode for more information on how to make your own.
  316. template <typename Handoff, template <typename> class Atom = std::atomic>
  317. struct LifoSemBase {
  318. /// Constructor
  319. constexpr explicit LifoSemBase(uint32_t initialValue = 0)
  320. : head_(LifoSemHead::fresh(initialValue)) {}
  321. LifoSemBase(LifoSemBase const&) = delete;
  322. LifoSemBase& operator=(LifoSemBase const&) = delete;
  323. /// Silently saturates if value is already 2^32-1
  324. bool post() {
  325. auto idx = incrOrPop(1);
  326. if (idx != 0) {
  327. idxToNode(idx).handoff().post();
  328. return true;
  329. }
  330. return false;
  331. }
  332. /// Equivalent to n calls to post(), except may be much more efficient.
  333. /// At any point in time at which the semaphore's value would exceed
  334. /// 2^32-1 if tracked with infinite precision, it may be silently
  335. /// truncated to 2^32-1. This saturation is not guaranteed to be exact,
  336. /// although it is guaranteed that overflow won't result in wrap-around.
  337. /// There would be a substantial performance and complexity cost in
  338. /// guaranteeing exact saturation (similar to the cost of maintaining
  339. /// linearizability near the zero value, but without as much of
  340. /// a benefit).
  341. void post(uint32_t n) {
  342. uint32_t idx;
  343. while (n > 0 && (idx = incrOrPop(n)) != 0) {
  344. // pop accounts for only 1
  345. idxToNode(idx).handoff().post();
  346. --n;
  347. }
  348. }
  349. /// Returns true iff shutdown() has been called
  350. bool isShutdown() const {
  351. return UNLIKELY(head_->load(std::memory_order_acquire).isShutdown());
  352. }
  353. /// Prevents blocking on this semaphore, causing all blocking wait()
  354. /// calls to throw ShutdownSemError. Both currently blocked wait() and
  355. /// future calls to wait() for which tryWait() would return false will
  356. /// cause an exception. Calls to wait() for which the matching post()
  357. /// has already occurred will proceed normally.
  358. void shutdown() {
  359. // first set the shutdown bit
  360. auto h = head_->load(std::memory_order_acquire);
  361. while (!h.isShutdown()) {
  362. if (head_->compare_exchange_strong(h, h.withShutdown())) {
  363. // success
  364. h = h.withShutdown();
  365. break;
  366. }
  367. // compare_exchange_strong rereads h, retry
  368. }
  369. // now wake up any waiters
  370. while (h.isNodeIdx()) {
  371. if (h.isLocked()) {
  372. std::this_thread::yield();
  373. h = head_->load(std::memory_order_acquire);
  374. continue;
  375. }
  376. auto& node = idxToNode(h.idx());
  377. auto repl = h.withPop(node.next);
  378. if (head_->compare_exchange_strong(h, repl)) {
  379. // successful pop, wake up the waiter and move on. The next
  380. // field is used to convey that this wakeup didn't consume a value
  381. node.setShutdownNotice();
  382. node.handoff().post();
  383. h = repl;
  384. }
  385. }
  386. }
  387. /// Returns true iff value was decremented
  388. bool tryWait() {
  389. uint32_t n = 1;
  390. auto rv = decrOrPush(n, 0);
  391. assert(
  392. (rv == WaitResult::DECR && n == 0) ||
  393. (rv != WaitResult::DECR && n == 1));
  394. // SHUTDOWN is okay here, since we don't actually wait
  395. return rv == WaitResult::DECR;
  396. }
  397. /// Equivalent to (but may be much more efficient than) n calls to
  398. /// tryWait(). Returns the total amount by which the semaphore's value
  399. /// was decreased
  400. uint32_t tryWait(uint32_t n) {
  401. auto const orig = n;
  402. while (n > 0) {
  403. #ifndef NDEBUG
  404. auto prev = n;
  405. #endif
  406. auto rv = decrOrPush(n, 0);
  407. assert(
  408. (rv == WaitResult::DECR && n < prev) ||
  409. (rv != WaitResult::DECR && n == prev));
  410. if (rv != WaitResult::DECR) {
  411. break;
  412. }
  413. }
  414. return orig - n;
  415. }
  416. /// Blocks the current thread until there is a matching post or the
  417. /// semaphore is shut down. Throws ShutdownSemError if the semaphore
  418. /// has been shut down and this method would otherwise be blocking.
  419. /// Note that wait() doesn't throw during shutdown if tryWait() would
  420. /// return true
  421. void wait() {
  422. auto const deadline = std::chrono::steady_clock::time_point::max();
  423. auto res = try_wait_until(deadline);
  424. FOLLY_SAFE_DCHECK(res, "infinity time has passed");
  425. }
  426. template <typename Rep, typename Period>
  427. bool try_wait_for(const std::chrono::duration<Rep, Period>& timeout) {
  428. return try_wait_until(timeout + std::chrono::steady_clock::now());
  429. }
  430. template <typename Clock, typename Duration>
  431. bool try_wait_until(
  432. const std::chrono::time_point<Clock, Duration>& deadline) {
  433. // early check isn't required for correctness, but is an important
  434. // perf win if we can avoid allocating and deallocating a node
  435. if (tryWait()) {
  436. return true;
  437. }
  438. // allocateNode() won't compile unless Handoff has a default
  439. // constructor
  440. UniquePtr node = allocateNode();
  441. auto rv = tryWaitOrPush(*node);
  442. if (UNLIKELY(rv == WaitResult::SHUTDOWN)) {
  443. assert(isShutdown());
  444. throw ShutdownSemError("wait() would block but semaphore is shut down");
  445. }
  446. if (rv == WaitResult::PUSH) {
  447. if (!node->handoff().try_wait_until(deadline)) {
  448. if (tryRemoveNode(*node)) {
  449. return false;
  450. } else {
  451. // We could not remove our node. Return to waiting.
  452. //
  453. // This only happens if we lose a removal race with post(),
  454. // so we are not likely to wait long. This is only
  455. // necessary to ensure we don't return node's memory back to
  456. // IndexedMemPool before post() has had a chance to post to
  457. // handoff(). In a stronger memory reclamation scheme, such
  458. // as hazptr or rcu, this would not be necessary.
  459. node->handoff().wait();
  460. }
  461. }
  462. if (UNLIKELY(node->isShutdownNotice())) {
  463. // this wait() didn't consume a value, it was triggered by shutdown
  464. throw ShutdownSemError(
  465. "blocking wait() interrupted by semaphore shutdown");
  466. }
  467. // node->handoff().wait() can't return until after the node has
  468. // been popped and post()ed, so it is okay for the UniquePtr to
  469. // recycle the node now
  470. }
  471. // else node wasn't pushed, so it is safe to recycle
  472. return true;
  473. }
  474. /// Returns a guess at the current value, designed for debugging.
  475. /// If there are no concurrent posters or waiters then this will
  476. /// be correct
  477. uint32_t valueGuess() const {
  478. // this is actually linearizable, but we don't promise that because
  479. // we may want to add striping in the future to help under heavy
  480. // contention
  481. auto h = head_->load(std::memory_order_acquire);
  482. return h.isNodeIdx() ? 0 : h.value();
  483. }
  484. protected:
  485. enum class WaitResult {
  486. PUSH,
  487. DECR,
  488. SHUTDOWN,
  489. };
  490. /// The type of a std::unique_ptr that will automatically return a
  491. /// LifoSemNode to the appropriate IndexedMemPool
  492. typedef std::
  493. unique_ptr<LifoSemNode<Handoff, Atom>, LifoSemNodeRecycler<Handoff, Atom>>
  494. UniquePtr;
  495. /// Returns a node that can be passed to decrOrLink
  496. template <typename... Args>
  497. UniquePtr allocateNode(Args&&... args) {
  498. auto idx = LifoSemRawNode<Atom>::pool().allocIndex();
  499. if (idx != 0) {
  500. auto& node = idxToNode(idx);
  501. node.clearShutdownNotice();
  502. try {
  503. node.init(std::forward<Args>(args)...);
  504. } catch (...) {
  505. LifoSemRawNode<Atom>::pool().recycleIndex(idx);
  506. throw;
  507. }
  508. return UniquePtr(&node);
  509. } else {
  510. return UniquePtr();
  511. }
  512. }
  513. /// Returns DECR if the semaphore value was decremented (and waiterNode
  514. /// was untouched), PUSH if a reference to the wait node was pushed,
  515. /// or SHUTDOWN if decrement was not possible and push wasn't allowed
  516. /// because isShutdown(). Ownership of the wait node remains the
  517. /// responsibility of the caller, who must not release it until after
  518. /// the node's Handoff has been posted.
  519. WaitResult tryWaitOrPush(LifoSemNode<Handoff, Atom>& waiterNode) {
  520. uint32_t n = 1;
  521. return decrOrPush(n, nodeToIdx(waiterNode));
  522. }
  523. private:
  524. CachelinePadded<folly::AtomicStruct<LifoSemHead, Atom>> head_;
  525. static LifoSemNode<Handoff, Atom>& idxToNode(uint32_t idx) {
  526. auto raw = &LifoSemRawNode<Atom>::pool()[idx];
  527. return *static_cast<LifoSemNode<Handoff, Atom>*>(raw);
  528. }
  529. static uint32_t nodeToIdx(const LifoSemNode<Handoff, Atom>& node) {
  530. return LifoSemRawNode<Atom>::pool().locateElem(&node);
  531. }
  532. // Locks the list head (blocking concurrent pushes and pops)
  533. // and attempts to remove this node. Returns true if node was
  534. // found and removed, false if not found.
  535. bool tryRemoveNode(const LifoSemNode<Handoff, Atom>& removenode) {
  536. auto removeidx = nodeToIdx(removenode);
  537. auto head = head_->load(std::memory_order_acquire);
  538. // Try to lock the head.
  539. while (true) {
  540. if (head.isLocked()) {
  541. std::this_thread::yield();
  542. head = head_->load(std::memory_order_acquire);
  543. continue;
  544. }
  545. if (!head.isNodeIdx()) {
  546. return false;
  547. }
  548. if (head_->compare_exchange_weak(
  549. head,
  550. head.withLock(),
  551. std::memory_order_acquire,
  552. std::memory_order_relaxed)) {
  553. break;
  554. }
  555. }
  556. // Update local var to what head_ is, for better assert() checking.
  557. head = head.withLock();
  558. bool result = false;
  559. auto idx = head.idx();
  560. if (idx == removeidx) {
  561. // pop from head. Head seqno is updated.
  562. head_->store(
  563. head.withoutLock(removenode.next), std::memory_order_release);
  564. return true;
  565. }
  566. auto node = &idxToNode(idx);
  567. idx = node->next;
  568. while (idx) {
  569. if (idx == removeidx) {
  570. // Pop from mid-list.
  571. node->next = removenode.next;
  572. result = true;
  573. break;
  574. }
  575. node = &idxToNode(idx);
  576. idx = node->next;
  577. }
  578. // Unlock and return result
  579. head_->store(head.withoutLock(head.idx()), std::memory_order_release);
  580. return result;
  581. }
  582. /// Either increments by n and returns 0, or pops a node and returns it.
  583. /// If n + the stripe's value overflows, then the stripe's value
  584. /// saturates silently at 2^32-1
  585. uint32_t incrOrPop(uint32_t n) {
  586. while (true) {
  587. assert(n > 0);
  588. auto head = head_->load(std::memory_order_acquire);
  589. if (head.isLocked()) {
  590. std::this_thread::yield();
  591. continue;
  592. }
  593. if (head.isNodeIdx()) {
  594. auto& node = idxToNode(head.idx());
  595. if (head_->compare_exchange_strong(head, head.withPop(node.next))) {
  596. // successful pop
  597. return head.idx();
  598. }
  599. } else {
  600. auto after = head.withValueIncr(n);
  601. if (head_->compare_exchange_strong(head, after)) {
  602. // successful incr
  603. return 0;
  604. }
  605. }
  606. // retry
  607. }
  608. }
  609. /// Returns DECR if some amount was decremented, with that amount
  610. /// subtracted from n. If n is 1 and this function returns DECR then n
  611. /// must be 0 afterward. Returns PUSH if no value could be decremented
  612. /// and idx was pushed, or if idx was zero and no push was performed but
  613. /// a push would have been performed with a valid node. Returns SHUTDOWN
  614. /// if the caller should have blocked but isShutdown(). If idx == 0,
  615. /// may return PUSH even after isShutdown() or may return SHUTDOWN
  616. WaitResult decrOrPush(uint32_t& n, uint32_t idx) {
  617. assert(n > 0);
  618. while (true) {
  619. auto head = head_->load(std::memory_order_acquire);
  620. if (head.isLocked()) {
  621. std::this_thread::yield();
  622. continue;
  623. }
  624. if (!head.isNodeIdx() && head.value() > 0) {
  625. // decr
  626. auto delta = std::min(n, head.value());
  627. if (head_->compare_exchange_strong(head, head.withValueDecr(delta))) {
  628. n -= delta;
  629. return WaitResult::DECR;
  630. }
  631. } else {
  632. // push
  633. if (idx == 0) {
  634. return WaitResult::PUSH;
  635. }
  636. if (UNLIKELY(head.isShutdown())) {
  637. return WaitResult::SHUTDOWN;
  638. }
  639. auto& node = idxToNode(idx);
  640. node.next = head.isNodeIdx() ? head.idx() : 0;
  641. if (head_->compare_exchange_strong(head, head.withPush(idx))) {
  642. // push succeeded
  643. return WaitResult::PUSH;
  644. }
  645. }
  646. }
  647. // retry
  648. }
  649. };
  650. } // namespace detail
  651. template <template <typename> class Atom, class BatonType>
  652. struct LifoSemImpl : public detail::LifoSemBase<BatonType, Atom> {
  653. constexpr explicit LifoSemImpl(uint32_t v = 0)
  654. : detail::LifoSemBase<BatonType, Atom>(v) {}
  655. };
  656. } // namespace folly