CPUThreadPoolExecutor.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. /*
  2. * Copyright 2017-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/executors/CPUThreadPoolExecutor.h>
  17. #include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
  18. #include <folly/portability/GFlags.h>
  19. DEFINE_bool(
  20. dynamic_cputhreadpoolexecutor,
  21. true,
  22. "CPUThreadPoolExecutor will dynamically create and destroy threads");
  23. namespace folly {
  24. const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
  25. CPUThreadPoolExecutor::CPUThreadPoolExecutor(
  26. size_t numThreads,
  27. std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
  28. std::shared_ptr<ThreadFactory> threadFactory)
  29. : ThreadPoolExecutor(
  30. numThreads,
  31. FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
  32. std::move(threadFactory)),
  33. taskQueue_(std::move(taskQueue)) {
  34. setNumThreads(numThreads);
  35. }
  36. CPUThreadPoolExecutor::CPUThreadPoolExecutor(
  37. std::pair<size_t, size_t> numThreads,
  38. std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
  39. std::shared_ptr<ThreadFactory> threadFactory)
  40. : ThreadPoolExecutor(
  41. numThreads.first,
  42. numThreads.second,
  43. std::move(threadFactory)),
  44. taskQueue_(std::move(taskQueue)) {
  45. setNumThreads(numThreads.first);
  46. }
  47. CPUThreadPoolExecutor::CPUThreadPoolExecutor(
  48. size_t numThreads,
  49. std::shared_ptr<ThreadFactory> threadFactory)
  50. : CPUThreadPoolExecutor(
  51. numThreads,
  52. std::make_unique<LifoSemMPMCQueue<CPUTask>>(
  53. CPUThreadPoolExecutor::kDefaultMaxQueueSize),
  54. std::move(threadFactory)) {}
  55. CPUThreadPoolExecutor::CPUThreadPoolExecutor(
  56. std::pair<size_t, size_t> numThreads,
  57. std::shared_ptr<ThreadFactory> threadFactory)
  58. : CPUThreadPoolExecutor(
  59. numThreads,
  60. std::make_unique<LifoSemMPMCQueue<CPUTask>>(
  61. CPUThreadPoolExecutor::kDefaultMaxQueueSize),
  62. std::move(threadFactory)) {}
  63. CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
  64. : CPUThreadPoolExecutor(
  65. numThreads,
  66. std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
  67. CPUThreadPoolExecutor::CPUThreadPoolExecutor(
  68. size_t numThreads,
  69. int8_t numPriorities,
  70. std::shared_ptr<ThreadFactory> threadFactory)
  71. : CPUThreadPoolExecutor(
  72. numThreads,
  73. std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
  74. numPriorities,
  75. CPUThreadPoolExecutor::kDefaultMaxQueueSize),
  76. std::move(threadFactory)) {}
  77. CPUThreadPoolExecutor::CPUThreadPoolExecutor(
  78. size_t numThreads,
  79. int8_t numPriorities,
  80. size_t maxQueueSize,
  81. std::shared_ptr<ThreadFactory> threadFactory)
  82. : CPUThreadPoolExecutor(
  83. numThreads,
  84. std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
  85. numPriorities,
  86. maxQueueSize),
  87. std::move(threadFactory)) {}
  88. CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
  89. stop();
  90. CHECK(threadsToStop_ == 0);
  91. }
  92. void CPUThreadPoolExecutor::add(Func func) {
  93. add(std::move(func), std::chrono::milliseconds(0));
  94. }
  95. void CPUThreadPoolExecutor::add(
  96. Func func,
  97. std::chrono::milliseconds expiration,
  98. Func expireCallback) {
  99. auto result = taskQueue_->add(
  100. CPUTask(std::move(func), expiration, std::move(expireCallback)));
  101. if (!result.reusedThread) {
  102. ensureActiveThreads();
  103. }
  104. }
  105. void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
  106. add(std::move(func), priority, std::chrono::milliseconds(0));
  107. }
  108. void CPUThreadPoolExecutor::add(
  109. Func func,
  110. int8_t priority,
  111. std::chrono::milliseconds expiration,
  112. Func expireCallback) {
  113. CHECK(getNumPriorities() > 0);
  114. auto result = taskQueue_->addWithPriority(
  115. CPUTask(std::move(func), expiration, std::move(expireCallback)),
  116. priority);
  117. if (!result.reusedThread) {
  118. ensureActiveThreads();
  119. }
  120. }
  121. uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
  122. return taskQueue_->getNumPriorities();
  123. }
  124. size_t CPUThreadPoolExecutor::getTaskQueueSize() const {
  125. return taskQueue_->size();
  126. }
  127. BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
  128. CPUThreadPoolExecutor::getTaskQueue() {
  129. return taskQueue_.get();
  130. }
  131. // threadListLock_ must be writelocked.
  132. bool CPUThreadPoolExecutor::tryDecrToStop() {
  133. auto toStop = threadsToStop_.load(std::memory_order_relaxed);
  134. if (toStop <= 0) {
  135. return false;
  136. }
  137. threadsToStop_.store(toStop - 1, std::memory_order_relaxed);
  138. return true;
  139. }
  140. bool CPUThreadPoolExecutor::taskShouldStop(folly::Optional<CPUTask>& task) {
  141. if (tryDecrToStop()) {
  142. return true;
  143. }
  144. if (task) {
  145. return false;
  146. } else {
  147. return tryTimeoutThread();
  148. }
  149. return true;
  150. }
  151. void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) {
  152. this->threadPoolHook_.registerThread();
  153. thread->startupBaton.post();
  154. while (true) {
  155. auto task = taskQueue_->try_take_for(threadTimeout_);
  156. // Handle thread stopping, either by task timeout, or
  157. // by 'poison' task added in join() or stop().
  158. if (UNLIKELY(!task || task.value().poison)) {
  159. // Actually remove the thread from the list.
  160. SharedMutex::WriteHolder w{&threadListLock_};
  161. if (taskShouldStop(task)) {
  162. for (auto& o : observers_) {
  163. o->threadStopped(thread.get());
  164. }
  165. threadList_.remove(thread);
  166. stoppedThreads_.add(thread);
  167. return;
  168. } else {
  169. continue;
  170. }
  171. }
  172. runTask(thread, std::move(task.value()));
  173. if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
  174. SharedMutex::WriteHolder w{&threadListLock_};
  175. if (tryDecrToStop()) {
  176. threadList_.remove(thread);
  177. stoppedThreads_.add(thread);
  178. return;
  179. }
  180. }
  181. }
  182. }
  183. void CPUThreadPoolExecutor::stopThreads(size_t n) {
  184. threadsToStop_ += n;
  185. for (size_t i = 0; i < n; i++) {
  186. taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
  187. }
  188. }
  189. // threadListLock_ is read (or write) locked.
  190. size_t CPUThreadPoolExecutor::getPendingTaskCountImpl() {
  191. return taskQueue_->size();
  192. }
  193. } // namespace folly