ObserverManager.cpp 5.6 KB


  1. /*
  2. * Copyright 2016-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/experimental/observer/detail/ObserverManager.h>
  17. #include <folly/ExceptionString.h>
  18. #include <folly/Format.h>
  19. #include <folly/MPMCQueue.h>
  20. #include <folly/Range.h>
  21. #include <folly/Singleton.h>
  22. #include <folly/portability/GFlags.h>
  23. #include <folly/system/ThreadName.h>
  24. namespace folly {
  25. namespace observer_detail {
  26. FOLLY_TLS bool ObserverManager::inManagerThread_{false};
  27. FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
  28. ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
  29. DEFINE_int32(
  30. observer_manager_pool_size,
  31. 4,
  32. "How many internal threads ObserverManager should use");
  33. static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
  34. namespace {
  35. constexpr size_t kCurrentQueueSize{10 * 1024};
  36. constexpr size_t kNextQueueSize{10 * 1024};
  37. } // namespace
  38. class ObserverManager::CurrentQueue {
  39. public:
  40. CurrentQueue() : queue_(kCurrentQueueSize) {
  41. if (FLAGS_observer_manager_pool_size < 1) {
  42. LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
  43. FLAGS_observer_manager_pool_size = 1;
  44. }
  45. for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
  46. threads_.emplace_back([this, i]() {
  47. folly::setThreadName(
  48. folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
  49. ObserverManager::inManagerThread_ = true;
  50. while (true) {
  51. Function<void()> task;
  52. queue_.blockingRead(task);
  53. if (!task) {
  54. return;
  55. }
  56. try {
  57. task();
  58. } catch (...) {
  59. LOG(ERROR) << "Exception while running CurrentQueue task: "
  60. << exceptionStr(std::current_exception());
  61. }
  62. }
  63. });
  64. }
  65. }
  66. ~CurrentQueue() {
  67. for (size_t i = 0; i < threads_.size(); ++i) {
  68. queue_.blockingWrite(nullptr);
  69. }
  70. for (auto& thread : threads_) {
  71. thread.join();
  72. }
  73. CHECK(queue_.isEmpty());
  74. }
  75. void add(Function<void()> task) {
  76. if (ObserverManager::inManagerThread()) {
  77. if (!queue_.write(std::move(task))) {
  78. throw std::runtime_error("Too many Observers scheduled for update.");
  79. }
  80. } else {
  81. queue_.blockingWrite(std::move(task));
  82. }
  83. }
  84. private:
  85. MPMCQueue<Function<void()>> queue_;
  86. std::vector<std::thread> threads_;
  87. };
  88. class ObserverManager::NextQueue {
  89. public:
  90. explicit NextQueue(ObserverManager& manager)
  91. : manager_(manager), queue_(kNextQueueSize) {
  92. thread_ = std::thread([&]() {
  93. Core::WeakPtr queueCoreWeak;
  94. while (true) {
  95. queue_.blockingRead(queueCoreWeak);
  96. if (stop_) {
  97. return;
  98. }
  99. std::vector<Core::Ptr> cores;
  100. {
  101. auto queueCore = queueCoreWeak.lock();
  102. if (!queueCore) {
  103. continue;
  104. }
  105. cores.emplace_back(std::move(queueCore));
  106. }
  107. {
  108. SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
  109. // We can't pick more tasks from the queue after we bumped the
  110. // version, so we have to do this while holding the lock.
  111. while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
  112. if (stop_) {
  113. return;
  114. }
  115. if (auto queueCore = queueCoreWeak.lock()) {
  116. cores.emplace_back(std::move(queueCore));
  117. }
  118. }
  119. ++manager_.version_;
  120. }
  121. for (auto& core : cores) {
  122. manager_.scheduleRefresh(std::move(core), manager_.version_, true);
  123. }
  124. }
  125. });
  126. }
  127. void add(Core::WeakPtr core) {
  128. queue_.blockingWrite(std::move(core));
  129. }
  130. ~NextQueue() {
  131. stop_ = true;
  132. // Write to the queue to notify the thread.
  133. queue_.blockingWrite(Core::WeakPtr());
  134. thread_.join();
  135. }
  136. private:
  137. ObserverManager& manager_;
  138. MPMCQueue<Core::WeakPtr> queue_;
  139. std::thread thread_;
  140. std::atomic<bool> stop_{false};
  141. };
  142. ObserverManager::ObserverManager() {
  143. currentQueue_ = std::make_unique<CurrentQueue>();
  144. nextQueue_ = std::make_unique<NextQueue>(*this);
  145. }
  146. ObserverManager::~ObserverManager() {
  147. // Destroy NextQueue, before the rest of this object, since it expects
  148. // ObserverManager to be alive.
  149. nextQueue_.reset();
  150. currentQueue_.reset();
  151. }
  152. void ObserverManager::scheduleCurrent(Function<void()> task) {
  153. currentQueue_->add(std::move(task));
  154. }
  155. void ObserverManager::scheduleNext(Core::WeakPtr core) {
  156. nextQueue_->add(std::move(core));
  157. }
  158. struct ObserverManager::Singleton {
  159. static folly::Singleton<ObserverManager> instance;
  160. // MSVC 2015 doesn't let us access ObserverManager's constructor if we
  161. // try to use a lambda to initialize instance, so we have to create
  162. // an actual function instead.
  163. static ObserverManager* createManager() {
  164. return new ObserverManager();
  165. }
  166. };
  167. folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
  168. createManager);
  169. std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
  170. return Singleton::instance.try_get();
  171. }
  172. } // namespace observer_detail
  173. } // namespace folly