ThreadPoolExecutor.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. /*
  2. * Copyright 2017-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/executors/ThreadPoolExecutor.h>
  17. #include <folly/executors/GlobalThreadPoolList.h>
  18. #include <folly/synchronization/AsymmetricMemoryBarrier.h>
  19. namespace folly {
  20. using SyncVecThreadPoolExecutors =
  21. folly::Synchronized<std::vector<ThreadPoolExecutor*>>;
  22. SyncVecThreadPoolExecutors& getSyncVecThreadPoolExecutors() {
  23. static Indestructible<SyncVecThreadPoolExecutors> storage;
  24. return *storage;
  25. }
  26. DEFINE_int64(
  27. threadtimeout_ms,
  28. 60000,
  29. "Idle time before ThreadPoolExecutor threads are joined");
  30. ThreadPoolExecutor::ThreadPoolExecutor(
  31. size_t /* maxThreads */,
  32. size_t minThreads,
  33. std::shared_ptr<ThreadFactory> threadFactory,
  34. bool isWaitForAll)
  35. : threadFactory_(std::move(threadFactory)),
  36. isWaitForAll_(isWaitForAll),
  37. taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
  38. threadPoolHook_("folly::ThreadPoolExecutor"),
  39. minThreads_(minThreads),
  40. threadTimeout_(FLAGS_threadtimeout_ms) {
  41. getSyncVecThreadPoolExecutors()->push_back(this);
  42. }
  43. ThreadPoolExecutor::~ThreadPoolExecutor() {
  44. joinKeepAliveOnce();
  45. CHECK_EQ(0, threadList_.get().size());
  46. getSyncVecThreadPoolExecutors().withWLock([this](auto& tpe) {
  47. tpe.erase(std::remove(tpe.begin(), tpe.end(), this), tpe.end());
  48. });
  49. }
  50. ThreadPoolExecutor::Task::Task(
  51. Func&& func,
  52. std::chrono::milliseconds expiration,
  53. Func&& expireCallback)
  54. : func_(std::move(func)),
  55. expiration_(expiration),
  56. expireCallback_(std::move(expireCallback)),
  57. context_(folly::RequestContext::saveContext()) {
  58. // Assume that the task in enqueued on creation
  59. enqueueTime_ = std::chrono::steady_clock::now();
  60. }
  61. void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
  62. thread->idle = false;
  63. auto startTime = std::chrono::steady_clock::now();
  64. task.stats_.waitTime = startTime - task.enqueueTime_;
  65. if (task.expiration_ > std::chrono::milliseconds(0) &&
  66. task.stats_.waitTime >= task.expiration_) {
  67. task.stats_.expired = true;
  68. if (task.expireCallback_ != nullptr) {
  69. task.expireCallback_();
  70. }
  71. } else {
  72. folly::RequestContextScopeGuard rctx(task.context_);
  73. try {
  74. task.func_();
  75. } catch (const std::exception& e) {
  76. LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled "
  77. << typeid(e).name() << " exception: " << e.what();
  78. } catch (...) {
  79. LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
  80. "object";
  81. }
  82. task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
  83. }
  84. thread->idle = true;
  85. thread->lastActiveTime = std::chrono::steady_clock::now();
  86. thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
  87. *thread->taskStatsCallbacks->inCallback = true;
  88. SCOPE_EXIT {
  89. *thread->taskStatsCallbacks->inCallback = false;
  90. };
  91. try {
  92. for (auto& callback : callbacks) {
  93. callback(task.stats_);
  94. }
  95. } catch (const std::exception& e) {
  96. LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
  97. "unhandled "
  98. << typeid(e).name() << " exception: " << e.what();
  99. } catch (...) {
  100. LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
  101. "unhandled non-exception object";
  102. }
  103. });
  104. }
  105. size_t ThreadPoolExecutor::numThreads() {
  106. return maxThreads_.load(std::memory_order_relaxed);
  107. }
  108. size_t ThreadPoolExecutor::numActiveThreads() {
  109. return activeThreads_.load(std::memory_order_relaxed);
  110. }
  111. // Set the maximum number of running threads.
  112. void ThreadPoolExecutor::setNumThreads(size_t numThreads) {
  113. /* Since ThreadPoolExecutor may be dynamically adjusting the number of
  114. threads, we adjust the relevant variables instead of changing
  115. the number of threads directly. Roughly:
  116. If numThreads < minthreads reset minThreads to numThreads.
  117. If numThreads < active threads, reduce number of running threads.
  118. If the number of pending tasks is > 0, then increase the currently
  119. active number of threads such that we can run all the tasks, or reach
  120. numThreads.
  121. Note that if there are observers, we actually have to create all
  122. the threads, because some observer implementations need to 'observe'
  123. all thread creation (see tests for an example of this)
  124. */
  125. size_t numThreadsToJoin = 0;
  126. {
  127. SharedMutex::WriteHolder w{&threadListLock_};
  128. auto pending = getPendingTaskCountImpl();
  129. maxThreads_.store(numThreads, std::memory_order_relaxed);
  130. auto active = activeThreads_.load(std::memory_order_relaxed);
  131. auto minthreads = minThreads_.load(std::memory_order_relaxed);
  132. if (numThreads < minthreads) {
  133. minthreads = numThreads;
  134. minThreads_.store(numThreads, std::memory_order_relaxed);
  135. }
  136. if (active > numThreads) {
  137. numThreadsToJoin = active - numThreads;
  138. if (numThreadsToJoin > active - minthreads) {
  139. numThreadsToJoin = active - minthreads;
  140. }
  141. ThreadPoolExecutor::removeThreads(numThreadsToJoin, false);
  142. activeThreads_.store(
  143. active - numThreadsToJoin, std::memory_order_relaxed);
  144. } else if (pending > 0 || observers_.size() > 0 || active < minthreads) {
  145. size_t numToAdd = std::min(pending, numThreads - active);
  146. if (observers_.size() > 0) {
  147. numToAdd = numThreads - active;
  148. }
  149. if (active + numToAdd < minthreads) {
  150. numToAdd = minthreads - active;
  151. }
  152. ThreadPoolExecutor::addThreads(numToAdd);
  153. activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
  154. }
  155. }
  156. /* We may have removed some threads, attempt to join them */
  157. joinStoppedThreads(numThreadsToJoin);
  158. }
  159. // threadListLock_ is writelocked
  160. void ThreadPoolExecutor::addThreads(size_t n) {
  161. std::vector<ThreadPtr> newThreads;
  162. for (size_t i = 0; i < n; i++) {
  163. newThreads.push_back(makeThread());
  164. }
  165. for (auto& thread : newThreads) {
  166. // TODO need a notion of failing to create the thread
  167. // and then handling for that case
  168. thread->handle = threadFactory_->newThread(
  169. std::bind(&ThreadPoolExecutor::threadRun, this, thread));
  170. threadList_.add(thread);
  171. }
  172. for (auto& thread : newThreads) {
  173. thread->startupBaton.wait();
  174. }
  175. for (auto& o : observers_) {
  176. for (auto& thread : newThreads) {
  177. o->threadStarted(thread.get());
  178. }
  179. }
  180. }
  181. // threadListLock_ is writelocked
  182. void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
  183. isJoin_ = isJoin;
  184. stopThreads(n);
  185. }
  186. void ThreadPoolExecutor::joinStoppedThreads(size_t n) {
  187. for (size_t i = 0; i < n; i++) {
  188. auto thread = stoppedThreads_.take();
  189. thread->handle.join();
  190. }
  191. }
  192. void ThreadPoolExecutor::stop() {
  193. joinKeepAliveOnce();
  194. size_t n = 0;
  195. {
  196. SharedMutex::WriteHolder w{&threadListLock_};
  197. maxThreads_.store(0, std::memory_order_release);
  198. activeThreads_.store(0, std::memory_order_release);
  199. n = threadList_.get().size();
  200. removeThreads(n, false);
  201. n += threadsToJoin_.load(std::memory_order_relaxed);
  202. threadsToJoin_.store(0, std::memory_order_relaxed);
  203. }
  204. joinStoppedThreads(n);
  205. CHECK_EQ(0, threadList_.get().size());
  206. CHECK_EQ(0, stoppedThreads_.size());
  207. }
  208. void ThreadPoolExecutor::join() {
  209. joinKeepAliveOnce();
  210. size_t n = 0;
  211. {
  212. SharedMutex::WriteHolder w{&threadListLock_};
  213. maxThreads_.store(0, std::memory_order_release);
  214. activeThreads_.store(0, std::memory_order_release);
  215. n = threadList_.get().size();
  216. removeThreads(n, true);
  217. n += threadsToJoin_.load(std::memory_order_relaxed);
  218. threadsToJoin_.store(0, std::memory_order_relaxed);
  219. }
  220. joinStoppedThreads(n);
  221. CHECK_EQ(0, threadList_.get().size());
  222. CHECK_EQ(0, stoppedThreads_.size());
  223. }
  224. void ThreadPoolExecutor::withAll(FunctionRef<void(ThreadPoolExecutor&)> f) {
  225. getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) {
  226. for (auto tpe : tpes) {
  227. f(*tpe);
  228. }
  229. });
  230. }
  231. ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
  232. const auto now = std::chrono::steady_clock::now();
  233. SharedMutex::ReadHolder r{&threadListLock_};
  234. ThreadPoolExecutor::PoolStats stats;
  235. size_t activeTasks = 0;
  236. size_t idleAlive = 0;
  237. for (auto thread : threadList_.get()) {
  238. if (thread->idle) {
  239. const std::chrono::nanoseconds idleTime = now - thread->lastActiveTime;
  240. stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
  241. idleAlive++;
  242. } else {
  243. activeTasks++;
  244. }
  245. }
  246. stats.pendingTaskCount = getPendingTaskCountImpl();
  247. stats.totalTaskCount = stats.pendingTaskCount + activeTasks;
  248. stats.threadCount = maxThreads_.load(std::memory_order_relaxed);
  249. stats.activeThreadCount =
  250. activeThreads_.load(std::memory_order_relaxed) - idleAlive;
  251. stats.idleThreadCount = stats.threadCount - stats.activeThreadCount;
  252. return stats;
  253. }
  254. size_t ThreadPoolExecutor::getPendingTaskCount() {
  255. SharedMutex::ReadHolder r{&threadListLock_};
  256. return getPendingTaskCountImpl();
  257. }
  258. std::string ThreadPoolExecutor::getName() {
  259. auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get());
  260. if (ntf == nullptr) {
  261. return folly::demangle(typeid(*this).name()).toStdString();
  262. }
  263. return ntf->getNamePrefix();
  264. }
  265. std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
  266. void ThreadPoolExecutor::subscribeToTaskStats(TaskStatsCallback cb) {
  267. if (*taskStatsCallbacks_->inCallback) {
  268. throw std::runtime_error("cannot subscribe in task stats callback");
  269. }
  270. taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
  271. }
  272. BlockingQueueAddResult ThreadPoolExecutor::StoppedThreadQueue::add(
  273. ThreadPoolExecutor::ThreadPtr item) {
  274. std::lock_guard<std::mutex> guard(mutex_);
  275. queue_.push(std::move(item));
  276. return sem_.post();
  277. }
  278. ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
  279. while (true) {
  280. {
  281. std::lock_guard<std::mutex> guard(mutex_);
  282. if (queue_.size() > 0) {
  283. auto item = std::move(queue_.front());
  284. queue_.pop();
  285. return item;
  286. }
  287. }
  288. sem_.wait();
  289. }
  290. }
  291. folly::Optional<ThreadPoolExecutor::ThreadPtr>
  292. ThreadPoolExecutor::StoppedThreadQueue::try_take_for(
  293. std::chrono::milliseconds time) {
  294. while (true) {
  295. {
  296. std::lock_guard<std::mutex> guard(mutex_);
  297. if (queue_.size() > 0) {
  298. auto item = std::move(queue_.front());
  299. queue_.pop();
  300. return item;
  301. }
  302. }
  303. if (!sem_.try_wait_for(time)) {
  304. return folly::none;
  305. }
  306. }
  307. }
  308. size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
  309. std::lock_guard<std::mutex> guard(mutex_);
  310. return queue_.size();
  311. }
  312. void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
  313. {
  314. SharedMutex::WriteHolder r{&threadListLock_};
  315. observers_.push_back(o);
  316. for (auto& thread : threadList_.get()) {
  317. o->threadPreviouslyStarted(thread.get());
  318. }
  319. }
  320. while (activeThreads_.load(std::memory_order_relaxed) <
  321. maxThreads_.load(std::memory_order_relaxed)) {
  322. ensureActiveThreads();
  323. }
  324. }
  325. void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
  326. SharedMutex::WriteHolder r{&threadListLock_};
  327. for (auto& thread : threadList_.get()) {
  328. o->threadNotYetStopped(thread.get());
  329. }
  330. for (auto it = observers_.begin(); it != observers_.end(); it++) {
  331. if (*it == o) {
  332. observers_.erase(it);
  333. return;
  334. }
  335. }
  336. DCHECK(false);
  337. }
  338. // Idle threads may have destroyed themselves, attempt to join
  339. // them here
  340. void ThreadPoolExecutor::ensureJoined() {
  341. auto tojoin = threadsToJoin_.load(std::memory_order_relaxed);
  342. if (tojoin) {
  343. {
  344. SharedMutex::WriteHolder w{&threadListLock_};
  345. tojoin = threadsToJoin_.load(std::memory_order_relaxed);
  346. threadsToJoin_.store(0, std::memory_order_relaxed);
  347. }
  348. joinStoppedThreads(tojoin);
  349. }
  350. }
  351. // threadListLock_ must be write locked.
  352. bool ThreadPoolExecutor::tryTimeoutThread() {
  353. // Try to stop based on idle thread timeout (try_take_for),
  354. // if there are at least minThreads running.
  355. if (!minActive()) {
  356. return false;
  357. }
  358. // Remove thread from active count
  359. activeThreads_.store(
  360. activeThreads_.load(std::memory_order_relaxed) - 1,
  361. std::memory_order_relaxed);
  362. // There is a memory ordering constraint w.r.t the queue
  363. // implementation's add() and getPendingTaskCountImpl() - while many
  364. // queues have seq_cst ordering, some do not, so add an explicit
  365. // barrier. tryTimeoutThread is the slow path and only happens once
  366. // every thread timeout; use asymmetric barrier to keep add() fast.
  367. asymmetricHeavyBarrier();
  368. // If this is based on idle thread timeout, then
  369. // adjust vars appropriately (otherwise stop() or join()
  370. // does this).
  371. if (getPendingTaskCountImpl() > 0) {
  372. // There are still pending tasks, we can't stop yet.
  373. // re-up active threads and return.
  374. activeThreads_.store(
  375. activeThreads_.load(std::memory_order_relaxed) + 1,
  376. std::memory_order_relaxed);
  377. return false;
  378. }
  379. threadsToJoin_.store(
  380. threadsToJoin_.load(std::memory_order_relaxed) + 1,
  381. std::memory_order_relaxed);
  382. return true;
  383. }
  384. // If we can't ensure that we were able to hand off a task to a thread,
  385. // attempt to start a thread that handled the task, if we aren't already
  386. // running the maximum number of threads.
  387. void ThreadPoolExecutor::ensureActiveThreads() {
  388. ensureJoined();
  389. // Matches barrier in tryTimeoutThread(). Ensure task added
  390. // is seen before loading activeThreads_ below.
  391. asymmetricLightBarrier();
  392. // Fast path assuming we are already at max threads.
  393. auto active = activeThreads_.load(std::memory_order_relaxed);
  394. auto total = maxThreads_.load(std::memory_order_relaxed);
  395. if (active >= total) {
  396. return;
  397. }
  398. SharedMutex::WriteHolder w{&threadListLock_};
  399. // Double check behind lock.
  400. active = activeThreads_.load(std::memory_order_relaxed);
  401. total = maxThreads_.load(std::memory_order_relaxed);
  402. if (active >= total) {
  403. return;
  404. }
  405. ThreadPoolExecutor::addThreads(1);
  406. activeThreads_.store(active + 1, std::memory_order_relaxed);
  407. }
  408. // If an idle thread times out, only join it if there are at least
  409. // minThreads threads.
  410. bool ThreadPoolExecutor::minActive() {
  411. return activeThreads_.load(std::memory_order_relaxed) >
  412. minThreads_.load(std::memory_order_relaxed);
  413. }
  414. } // namespace folly