123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- /*
- * Copyright 2017-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #pragma once
- #include <folly/Likely.h>
- #include <folly/detail/Futex.h>
- #include <folly/detail/MemoryIdler.h>
- #include <folly/portability/Asm.h>
- #include <folly/synchronization/WaitOptions.h>
- #include <folly/synchronization/detail/Spin.h>
- #include <glog/logging.h>
- #include <atomic>
- namespace folly {
- /// SaturatingSemaphore is a flag that allows concurrent posting by
- /// multiple posters and concurrent non-destructive waiting by
- /// multiple waiters.
- ///
- /// A SaturatingSemaphore allows one or more waiter threads to check,
- /// spin, or block, indefinitely or with timeout, for a flag to be set
- /// by one or more poster threads. By setting the flag, posters
- /// announce to waiters (that may be already waiting or will check
- /// the flag in the future) that some condition is true. Posts to an
- /// already set flag are idempotent.
- ///
- /// SaturatingSemaphore is called so because it behaves like a hybrid
- /// binary/counted _semaphore_ with values zero and infinity, and
- /// post() and wait() functions. It is called _saturating_ because one
- /// post() is enough to set it to infinity and to satisfy any number
- /// of wait()-s. Once set (to infinity) it remains unchanged by
- /// subsequent post()-s and wait()-s, until it is reset() back to
- /// zero.
- ///
- /// The implementation of SaturatingSemaphore is based on that of
- /// Baton. It includes no internal padding, and is only 4 bytes in
- /// size. Any alignment or padding to avoid false sharing is up to
- /// the user.
- /// SaturatingSemaphore differs from Baton as follows:
- /// - Baton allows at most one call to post(); this allows any number
- /// and concurrently.
- /// - Baton allows at most one successful call to any wait variant;
- /// this allows any number and concurrently.
- ///
- /// Template parameter:
- /// - bool MayBlock: If false, waiting operations spin only. If
- /// true, timed and wait operations may block; adds an atomic
- /// instruction to the critical path of posters.
- ///
- /// Wait options:
- /// WaitOptions contains optional per call setting for spin-max duration:
- /// Calls to wait(), try_wait_until(), and try_wait_for() block only after the
- /// passage of the spin-max period. The default spin-max duration is 10 usec.
- /// The spin-max option is applicable only if MayBlock is true.
- ///
- /// Functions:
- /// bool ready():
- /// Returns true if the flag is set by a call to post, otherwise false.
- /// Equivalent to try_wait, but available on const receivers.
- /// void reset();
- /// Clears the flag.
- /// void post();
- /// Sets the flag and wakes all current waiters, i.e., causes all
- /// concurrent calls to wait, try_wait_for, and try_wait_until to
- /// return.
- /// void wait(
- /// WaitOptions opt = wait_options());
- /// Waits for the flag to be set by a call to post.
- /// bool try_wait();
- /// Returns true if the flag is set by a call to post, otherwise false.
- /// bool try_wait_until(
- /// time_point& deadline,
- /// WaitOptions& = wait_options());
- /// Returns true if the flag is set by a call to post before the
- /// deadline, otherwise false.
- /// bool try_wait_for(
- /// duration&,
- /// WaitOptions& = wait_options());
- /// Returns true if the flag is set by a call to post before the
- /// expiration of the specified duration, otherwise false.
- ///
- /// Usage:
- /// @code
- /// SaturatingSemaphore</* MayBlock = */ true> f;
- /// ASSERT_FALSE(f.try_wait());
- /// ASSERT_FALSE(f.try_wait_until(
- /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
- /// ASSERT_FALSE(f.try_wait_until(
- /// std::chrono::steady_clock::now() + std::chrono::microseconds(1),
- /// f.wait_options().spin_max(std::chrono::microseconds(1))));
- /// f.post();
- /// f.post();
- /// f.wait();
- /// f.wait(f.wait_options().spin_max(std::chrono::nanoseconds(100)));
- /// ASSERT_TRUE(f.try_wait());
- /// ASSERT_TRUE(f.try_wait_until(
- /// std::chrono::steady_clock::now() + std::chrono::microseconds(1)));
- /// f.wait();
- /// f.reset();
- /// ASSERT_FALSE(f.try_wait());
- /// @endcode
- template <bool MayBlock, template <typename> class Atom = std::atomic>
- class SaturatingSemaphore {
- detail::Futex<Atom> state_;
- enum State : uint32_t {
- NOTREADY = 0,
- READY = 1,
- BLOCKED = 2,
- };
- public:
- FOLLY_ALWAYS_INLINE static WaitOptions wait_options() {
- return {};
- }
- /** constructor */
- constexpr SaturatingSemaphore() noexcept : state_(NOTREADY) {}
- /** destructor */
- ~SaturatingSemaphore() {}
- /** ready */
- FOLLY_ALWAYS_INLINE bool ready() const noexcept {
- return state_.load(std::memory_order_acquire) == READY;
- }
- /** reset */
- void reset() noexcept {
- state_.store(NOTREADY, std::memory_order_relaxed);
- }
- /** post */
- FOLLY_ALWAYS_INLINE void post() noexcept {
- if (!MayBlock) {
- state_.store(READY, std::memory_order_release);
- } else {
- postFastWaiterMayBlock();
- }
- }
- /** wait */
- FOLLY_ALWAYS_INLINE
- void wait(const WaitOptions& opt = wait_options()) noexcept {
- try_wait_until(std::chrono::steady_clock::time_point::max(), opt);
- }
- /** try_wait */
- FOLLY_ALWAYS_INLINE bool try_wait() noexcept {
- return ready();
- }
- /** try_wait_until */
- template <typename Clock, typename Duration>
- FOLLY_ALWAYS_INLINE bool try_wait_until(
- const std::chrono::time_point<Clock, Duration>& deadline,
- const WaitOptions& opt = wait_options()) noexcept {
- if (LIKELY(try_wait())) {
- return true;
- }
- return tryWaitSlow(deadline, opt);
- }
- /** try_wait_for */
- template <class Rep, class Period>
- FOLLY_ALWAYS_INLINE bool try_wait_for(
- const std::chrono::duration<Rep, Period>& duration,
- const WaitOptions& opt = wait_options()) noexcept {
- if (LIKELY(try_wait())) {
- return true;
- }
- auto deadline = std::chrono::steady_clock::now() + duration;
- return tryWaitSlow(deadline, opt);
- }
- private:
- FOLLY_ALWAYS_INLINE void postFastWaiterMayBlock() noexcept {
- uint32_t before = NOTREADY;
- if (LIKELY(state_.compare_exchange_strong(
- before,
- READY,
- std::memory_order_release,
- std::memory_order_relaxed))) {
- return;
- }
- postSlowWaiterMayBlock(before);
- }
- void postSlowWaiterMayBlock(uint32_t before) noexcept; // defined below
- template <typename Clock, typename Duration>
- bool tryWaitSlow(
- const std::chrono::time_point<Clock, Duration>& deadline,
- const WaitOptions& opt) noexcept; // defined below
- };
- ///
- /// Member function definitioons
- ///
- /** postSlowWaiterMayBlock */
- template <bool MayBlock, template <typename> class Atom>
- FOLLY_NOINLINE void SaturatingSemaphore<MayBlock, Atom>::postSlowWaiterMayBlock(
- uint32_t before) noexcept {
- while (true) {
- if (before == NOTREADY) {
- if (state_.compare_exchange_strong(
- before,
- READY,
- std::memory_order_release,
- std::memory_order_relaxed)) {
- return;
- }
- }
- if (before == READY) { // Only if multiple posters
- // The reason for not simply returning (without the following
- // steps) is to prevent the following case:
- //
- // T1: T2: T3:
- // local1.post(); local2.post(); global.wait();
- // global.post(); global.post(); global.reset();
- // seq_cst fence
- // local1.try_wait() == true;
- // local2.try_wait() == false;
- //
- // This following steps correspond to T2's global.post(), where
- // global is already posted by T1.
- //
- // The following fence and load guarantee that T3 does not miss
- // T2's prior stores, i.e., local2.post() in this example.
- //
- // The following case is prevented:
- //
- // Starting with local2 == NOTREADY and global == READY
- //
- // T2: T3:
- // store READY to local2 // post store NOTREADY to global // reset
- // seq_cst fenc seq_cst fence
- // load READY from global // post load NOTREADY from local2 // try_wait
- //
- std::atomic_thread_fence(std::memory_order_seq_cst);
- before = state_.load(std::memory_order_relaxed);
- if (before == READY) {
- return;
- }
- continue;
- }
- DCHECK_EQ(before, BLOCKED);
- if (state_.compare_exchange_strong(
- before,
- READY,
- std::memory_order_release,
- std::memory_order_relaxed)) {
- detail::futexWake(&state_);
- return;
- }
- }
- }
- /** tryWaitSlow */
- template <bool MayBlock, template <typename> class Atom>
- template <typename Clock, typename Duration>
- FOLLY_NOINLINE bool SaturatingSemaphore<MayBlock, Atom>::tryWaitSlow(
- const std::chrono::time_point<Clock, Duration>& deadline,
- const WaitOptions& opt) noexcept {
- switch (detail::spin_pause_until(deadline, opt, [=] { return ready(); })) {
- case detail::spin_result::success:
- return true;
- case detail::spin_result::timeout:
- return false;
- case detail::spin_result::advance:
- break;
- }
- if (!MayBlock) {
- switch (detail::spin_yield_until(deadline, [=] { return ready(); })) {
- case detail::spin_result::success:
- return true;
- case detail::spin_result::timeout:
- return false;
- case detail::spin_result::advance:
- break;
- }
- }
- auto before = state_.load(std::memory_order_relaxed);
- while (before == NOTREADY &&
- !state_.compare_exchange_strong(
- before,
- BLOCKED,
- std::memory_order_relaxed,
- std::memory_order_relaxed)) {
- if (before == READY) {
- // TODO: move the acquire to the compare_exchange failure load after C++17
- std::atomic_thread_fence(std::memory_order_acquire);
- return true;
- }
- }
- while (true) {
- auto rv = detail::MemoryIdler::futexWaitUntil(state_, BLOCKED, deadline);
- if (rv == detail::FutexResult::TIMEDOUT) {
- assert(deadline != (std::chrono::time_point<Clock, Duration>::max()));
- return false;
- }
- if (ready()) {
- return true;
- }
- }
- }
- } // namespace folly
|