/* * Copyright 2016-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include namespace folly { namespace observer_detail { FOLLY_TLS bool ObserverManager::inManagerThread_{false}; FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies* ObserverManager::DependencyRecorder::currentDependencies_{nullptr}; DEFINE_int32( observer_manager_pool_size, 4, "How many internal threads ObserverManager should use"); static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"}; namespace { constexpr size_t kCurrentQueueSize{10 * 1024}; constexpr size_t kNextQueueSize{10 * 1024}; } // namespace class ObserverManager::CurrentQueue { public: CurrentQueue() : queue_(kCurrentQueueSize) { if (FLAGS_observer_manager_pool_size < 1) { LOG(ERROR) << "--observer_manager_pool_size should be >= 1"; FLAGS_observer_manager_pool_size = 1; } for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) { threads_.emplace_back([this, i]() { folly::setThreadName( folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i)); ObserverManager::inManagerThread_ = true; while (true) { Function task; queue_.blockingRead(task); if (!task) { return; } try { task(); } catch (...) { LOG(ERROR) << "Exception while running CurrentQueue task: " << exceptionStr(std::current_exception()); } } }); } } ~CurrentQueue() { for (size_t i = 0; i < threads_.size(); ++i) { queue_.blockingWrite(nullptr); } for (auto& thread : threads_) { thread.join(); } CHECK(queue_.isEmpty()); } void add(Function task) { if (ObserverManager::inManagerThread()) { if (!queue_.write(std::move(task))) { throw std::runtime_error("Too many Observers scheduled for update."); } } else { queue_.blockingWrite(std::move(task)); } } private: MPMCQueue> queue_; std::vector threads_; }; class ObserverManager::NextQueue { public: explicit NextQueue(ObserverManager& manager) : manager_(manager), queue_(kNextQueueSize) { thread_ = std::thread([&]() { Core::WeakPtr queueCoreWeak; while (true) { queue_.blockingRead(queueCoreWeak); if (stop_) { return; } std::vector cores; { auto queueCore = queueCoreWeak.lock(); if (!queueCore) { continue; } cores.emplace_back(std::move(queueCore)); } { SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_); // We can't pick more tasks from the queue after we bumped the // version, so we have to do this while holding the lock. while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) { if (stop_) { return; } if (auto queueCore = queueCoreWeak.lock()) { cores.emplace_back(std::move(queueCore)); } } ++manager_.version_; } for (auto& core : cores) { manager_.scheduleRefresh(std::move(core), manager_.version_, true); } } }); } void add(Core::WeakPtr core) { queue_.blockingWrite(std::move(core)); } ~NextQueue() { stop_ = true; // Write to the queue to notify the thread. queue_.blockingWrite(Core::WeakPtr()); thread_.join(); } private: ObserverManager& manager_; MPMCQueue queue_; std::thread thread_; std::atomic stop_{false}; }; ObserverManager::ObserverManager() { currentQueue_ = std::make_unique(); nextQueue_ = std::make_unique(*this); } ObserverManager::~ObserverManager() { // Destroy NextQueue, before the rest of this object, since it expects // ObserverManager to be alive. nextQueue_.reset(); currentQueue_.reset(); } void ObserverManager::scheduleCurrent(Function task) { currentQueue_->add(std::move(task)); } void ObserverManager::scheduleNext(Core::WeakPtr core) { nextQueue_->add(std::move(core)); } struct ObserverManager::Singleton { static folly::Singleton instance; // MSVC 2015 doesn't let us access ObserverManager's constructor if we // try to use a lambda to initialize instance, so we have to create // an actual function instead. static ObserverManager* createManager() { return new ObserverManager(); } }; folly::Singleton ObserverManager::Singleton::instance( createManager); std::shared_ptr ObserverManager::getInstance() { return Singleton::instance.try_get(); } } // namespace observer_detail } // namespace folly