ThreadWheelTimekeeper.cpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/futures/ThreadWheelTimekeeper.h>
  17. #include <folly/Singleton.h>
  18. #include <folly/futures/Future.h>
  19. #include <future>
  20. namespace folly {
  21. namespace {
  22. Singleton<ThreadWheelTimekeeper> timekeeperSingleton_;
  23. // Our Callback object for HHWheelTimer
  24. struct WTCallback : public std::enable_shared_from_this<WTCallback>,
  25. public folly::HHWheelTimer::Callback {
  26. struct PrivateConstructorTag {};
  27. public:
  28. WTCallback(PrivateConstructorTag, EventBase* base) : base_(base) {}
  29. // Only allow creation by this factory, to ensure heap allocation.
  30. static std::shared_ptr<WTCallback> create(EventBase* base) {
  31. // optimization opportunity: memory pool
  32. auto cob = std::make_shared<WTCallback>(PrivateConstructorTag{}, base);
  33. // Capture shared_ptr of cob in lambda so that Core inside Promise will
  34. // hold a ref count to it. The ref count will be released when Core goes
  35. // away which happens when both Promise and Future go away
  36. cob->promise_.setInterruptHandler(
  37. [cob](exception_wrapper ew) { cob->interruptHandler(std::move(ew)); });
  38. return cob;
  39. }
  40. Future<Unit> getFuture() {
  41. return promise_.getFuture();
  42. }
  43. FOLLY_NODISCARD Promise<Unit> stealPromise() {
  44. // Don't need promise anymore. Break the circular reference as promise_
  45. // is holding a ref count to us via Core. Core won't go away until both
  46. // Promise and Future go away.
  47. return std::move(promise_);
  48. }
  49. protected:
  50. folly::Synchronized<EventBase*> base_;
  51. Promise<Unit> promise_;
  52. void timeoutExpired() noexcept override {
  53. base_ = nullptr;
  54. // Don't need Promise anymore, break the circular reference
  55. auto promise = stealPromise();
  56. if (!promise.isFulfilled()) {
  57. promise.setValue();
  58. }
  59. }
  60. void callbackCanceled() noexcept override {
  61. base_ = nullptr;
  62. // Don't need Promise anymore, break the circular reference
  63. auto promise = stealPromise();
  64. if (!promise.isFulfilled()) {
  65. promise.setException(FutureNoTimekeeper{});
  66. }
  67. }
  68. void interruptHandler(exception_wrapper ew) {
  69. auto rBase = base_.rlock();
  70. if (!*rBase) {
  71. return;
  72. }
  73. // Capture shared_ptr of self in lambda, if we don't do this, object
  74. // may go away before the lambda is executed from event base thread.
  75. // This is not racing with timeoutExpired anymore because this is called
  76. // through Future, which means Core is still alive and keeping a ref count
  77. // on us, so what timeouExpired is doing won't make the object go away
  78. (*rBase)->runInEventBaseThread(
  79. [me = shared_from_this(), ew = std::move(ew)]() mutable {
  80. me->cancelTimeout();
  81. // Don't need Promise anymore, break the circular reference
  82. auto promise = me->stealPromise();
  83. if (!promise.isFulfilled()) {
  84. promise.setException(std::move(ew));
  85. }
  86. });
  87. }
  88. };
  89. } // namespace
  90. ThreadWheelTimekeeper::ThreadWheelTimekeeper()
  91. : thread_([this] { eventBase_.loopForever(); }),
  92. wheelTimer_(
  93. HHWheelTimer::newTimer(&eventBase_, std::chrono::milliseconds(1))) {
  94. eventBase_.waitUntilRunning();
  95. eventBase_.runInEventBaseThread([this] {
  96. // 15 characters max
  97. eventBase_.setName("FutureTimekeepr");
  98. });
  99. }
  100. ThreadWheelTimekeeper::~ThreadWheelTimekeeper() {
  101. eventBase_.runInEventBaseThreadAndWait([this] {
  102. wheelTimer_->cancelAll();
  103. eventBase_.terminateLoopSoon();
  104. });
  105. thread_.join();
  106. }
  107. Future<Unit> ThreadWheelTimekeeper::after(Duration dur) {
  108. auto cob = WTCallback::create(&eventBase_);
  109. auto f = cob->getFuture();
  110. //
  111. // Even shared_ptr of cob is captured in lambda this is still somewhat *racy*
  112. // because it will be released once timeout is scheduled. So technically there
  113. // is no gurantee that EventBase thread can safely call timeout callback.
  114. // However due to fact that we are having circular reference here:
  115. // WTCallback->Promise->Core->WTCallbak, so three of them won't go away until
  116. // we break the circular reference. The break happens either in
  117. // WTCallback::timeoutExpired or WTCallback::interruptHandler. Former means
  118. // timeout callback is being safely executed. Latter captures shared_ptr of
  119. // WTCallback again in another lambda for canceling timeout. The moment
  120. // canceling timeout is executed in EventBase thread, the actual timeout
  121. // callback has either been executed, or will never be executed. So we are
  122. // fine here.
  123. //
  124. if (!eventBase_.runInEventBaseThread(
  125. [this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); })) {
  126. // Release promise to break the circular reference. Because if
  127. // scheduleTimeout fails, there is nothing to *promise*. Internally
  128. // Core would automatically set an exception result when Promise is
  129. // destructed before fulfilling.
  130. // This is either called from EventBase thread, or here.
  131. // They are somewhat racy but given the rare chance this could fail,
  132. // I don't see it is introducing any problem yet.
  133. auto promise = cob->stealPromise();
  134. if (!promise.isFulfilled()) {
  135. promise.setException(FutureNoTimekeeper{});
  136. }
  137. }
  138. return f;
  139. }
  140. namespace detail {
  141. std::shared_ptr<Timekeeper> getTimekeeperSingleton() {
  142. return timekeeperSingleton_.try_get();
  143. }
  144. } // namespace detail
  145. } // namespace folly