FunctionScheduler.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. /*
  2. * Copyright 2015-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/experimental/FunctionScheduler.h>
  17. #include <random>
  18. #include <folly/Conv.h>
  19. #include <folly/Random.h>
  20. #include <folly/String.h>
  21. #include <folly/system/ThreadName.h>
  22. using std::chrono::milliseconds;
  23. using std::chrono::steady_clock;
  24. namespace folly {
  25. namespace {
  26. struct ConsistentDelayFunctor {
  27. const milliseconds constInterval;
  28. explicit ConsistentDelayFunctor(milliseconds interval)
  29. : constInterval(interval) {
  30. if (interval < milliseconds::zero()) {
  31. throw std::invalid_argument(
  32. "FunctionScheduler: "
  33. "time interval must be non-negative");
  34. }
  35. }
  36. steady_clock::time_point operator()(
  37. steady_clock::time_point curNextRunTime,
  38. steady_clock::time_point curTime) const {
  39. auto intervalsPassed = (curTime - curNextRunTime) / constInterval;
  40. return (intervalsPassed + 1) * constInterval + curNextRunTime;
  41. }
  42. };
  43. struct ConstIntervalFunctor {
  44. const milliseconds constInterval;
  45. explicit ConstIntervalFunctor(milliseconds interval)
  46. : constInterval(interval) {
  47. if (interval < milliseconds::zero()) {
  48. throw std::invalid_argument(
  49. "FunctionScheduler: "
  50. "time interval must be non-negative");
  51. }
  52. }
  53. milliseconds operator()() const {
  54. return constInterval;
  55. }
  56. };
  57. struct PoissonDistributionFunctor {
  58. std::default_random_engine generator;
  59. std::poisson_distribution<int> poissonRandom;
  60. explicit PoissonDistributionFunctor(double meanPoissonMs)
  61. : poissonRandom(meanPoissonMs) {
  62. if (meanPoissonMs < 0.0) {
  63. throw std::invalid_argument(
  64. "FunctionScheduler: "
  65. "Poisson mean interval must be non-negative");
  66. }
  67. }
  68. milliseconds operator()() {
  69. return milliseconds(poissonRandom(generator));
  70. }
  71. };
  72. struct UniformDistributionFunctor {
  73. std::default_random_engine generator;
  74. std::uniform_int_distribution<milliseconds::rep> dist;
  75. UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
  76. : generator(Random::rand32()),
  77. dist(minInterval.count(), maxInterval.count()) {
  78. if (minInterval > maxInterval) {
  79. throw std::invalid_argument(
  80. "FunctionScheduler: "
  81. "min time interval must be less or equal than max interval");
  82. }
  83. if (minInterval < milliseconds::zero()) {
  84. throw std::invalid_argument(
  85. "FunctionScheduler: "
  86. "time interval must be non-negative");
  87. }
  88. }
  89. milliseconds operator()() {
  90. return milliseconds(dist(generator));
  91. }
  92. };
  93. } // namespace
  94. FunctionScheduler::FunctionScheduler() {}
  95. FunctionScheduler::~FunctionScheduler() {
  96. // make sure to stop the thread (if running)
  97. shutdown();
  98. }
  99. void FunctionScheduler::addFunction(
  100. Function<void()>&& cb,
  101. milliseconds interval,
  102. StringPiece nameID,
  103. milliseconds startDelay) {
  104. addFunctionInternal(
  105. std::move(cb),
  106. ConstIntervalFunctor(interval),
  107. nameID.str(),
  108. to<std::string>(interval.count(), "ms"),
  109. startDelay,
  110. false /*runOnce*/);
  111. }
  112. void FunctionScheduler::addFunction(
  113. Function<void()>&& cb,
  114. milliseconds interval,
  115. const LatencyDistribution& latencyDistr,
  116. StringPiece nameID,
  117. milliseconds startDelay) {
  118. if (latencyDistr.isPoisson) {
  119. addFunctionInternal(
  120. std::move(cb),
  121. PoissonDistributionFunctor(latencyDistr.poissonMean),
  122. nameID.str(),
  123. to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
  124. startDelay,
  125. false /*runOnce*/);
  126. } else {
  127. addFunction(std::move(cb), interval, nameID, startDelay);
  128. }
  129. }
  130. void FunctionScheduler::addFunctionOnce(
  131. Function<void()>&& cb,
  132. StringPiece nameID,
  133. milliseconds startDelay) {
  134. addFunctionInternal(
  135. std::move(cb),
  136. ConstIntervalFunctor(milliseconds::zero()),
  137. nameID.str(),
  138. "once",
  139. startDelay,
  140. true /*runOnce*/);
  141. }
  142. void FunctionScheduler::addFunctionUniformDistribution(
  143. Function<void()>&& cb,
  144. milliseconds minInterval,
  145. milliseconds maxInterval,
  146. StringPiece nameID,
  147. milliseconds startDelay) {
  148. addFunctionInternal(
  149. std::move(cb),
  150. UniformDistributionFunctor(minInterval, maxInterval),
  151. nameID.str(),
  152. to<std::string>(
  153. "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
  154. startDelay,
  155. false /*runOnce*/);
  156. }
  157. void FunctionScheduler::addFunctionConsistentDelay(
  158. Function<void()>&& cb,
  159. milliseconds interval,
  160. StringPiece nameID,
  161. milliseconds startDelay) {
  162. addFunctionInternal(
  163. std::move(cb),
  164. ConsistentDelayFunctor(interval),
  165. nameID.str(),
  166. to<std::string>(interval.count(), "ms"),
  167. startDelay,
  168. false /*runOnce*/);
  169. }
  170. void FunctionScheduler::addFunctionGenericDistribution(
  171. Function<void()>&& cb,
  172. IntervalDistributionFunc&& intervalFunc,
  173. const std::string& nameID,
  174. const std::string& intervalDescr,
  175. milliseconds startDelay) {
  176. addFunctionInternal(
  177. std::move(cb),
  178. std::move(intervalFunc),
  179. nameID,
  180. intervalDescr,
  181. startDelay,
  182. false /*runOnce*/);
  183. }
  184. void FunctionScheduler::addFunctionGenericNextRunTimeFunctor(
  185. Function<void()>&& cb,
  186. NextRunTimeFunc&& fn,
  187. const std::string& nameID,
  188. const std::string& intervalDescr,
  189. milliseconds startDelay) {
  190. addFunctionInternal(
  191. std::move(cb),
  192. std::move(fn),
  193. nameID,
  194. intervalDescr,
  195. startDelay,
  196. false /*runOnce*/);
  197. }
  198. template <typename RepeatFuncNextRunTimeFunc>
  199. void FunctionScheduler::addFunctionToHeapChecked(
  200. Function<void()>&& cb,
  201. RepeatFuncNextRunTimeFunc&& fn,
  202. const std::string& nameID,
  203. const std::string& intervalDescr,
  204. milliseconds startDelay,
  205. bool runOnce) {
  206. if (!cb) {
  207. throw std::invalid_argument(
  208. "FunctionScheduler: Scheduled function must be set");
  209. }
  210. if (!fn) {
  211. throw std::invalid_argument(
  212. "FunctionScheduler: "
  213. "interval distribution or next run time function must be set");
  214. }
  215. if (startDelay < milliseconds::zero()) {
  216. throw std::invalid_argument(
  217. "FunctionScheduler: start delay must be non-negative");
  218. }
  219. std::unique_lock<std::mutex> l(mutex_);
  220. auto it = functionsMap_.find(nameID);
  221. // check if the nameID is unique
  222. if (it != functionsMap_.end() && it->second->isValid()) {
  223. throw std::invalid_argument(to<std::string>(
  224. "FunctionScheduler: a function named \"", nameID, "\" already exists"));
  225. }
  226. if (currentFunction_ && currentFunction_->name == nameID) {
  227. throw std::invalid_argument(to<std::string>(
  228. "FunctionScheduler: a function named \"", nameID, "\" already exists"));
  229. }
  230. addFunctionToHeap(
  231. l,
  232. std::make_unique<RepeatFunc>(
  233. std::move(cb),
  234. std::forward<RepeatFuncNextRunTimeFunc>(fn),
  235. nameID,
  236. intervalDescr,
  237. startDelay,
  238. runOnce));
  239. }
  240. void FunctionScheduler::addFunctionInternal(
  241. Function<void()>&& cb,
  242. NextRunTimeFunc&& fn,
  243. const std::string& nameID,
  244. const std::string& intervalDescr,
  245. milliseconds startDelay,
  246. bool runOnce) {
  247. return addFunctionToHeapChecked(
  248. std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
  249. }
  250. void FunctionScheduler::addFunctionInternal(
  251. Function<void()>&& cb,
  252. IntervalDistributionFunc&& fn,
  253. const std::string& nameID,
  254. const std::string& intervalDescr,
  255. milliseconds startDelay,
  256. bool runOnce) {
  257. return addFunctionToHeapChecked(
  258. std::move(cb), std::move(fn), nameID, intervalDescr, startDelay, runOnce);
  259. }
  260. bool FunctionScheduler::cancelFunctionWithLock(
  261. std::unique_lock<std::mutex>& lock,
  262. StringPiece nameID) {
  263. CHECK_EQ(lock.owns_lock(), true);
  264. if (currentFunction_ && currentFunction_->name == nameID) {
  265. functionsMap_.erase(currentFunction_->name);
  266. // This function is currently being run. Clear currentFunction_
  267. // The running thread will see this and won't reschedule the function.
  268. currentFunction_ = nullptr;
  269. cancellingCurrentFunction_ = true;
  270. return true;
  271. }
  272. return false;
  273. }
  274. bool FunctionScheduler::cancelFunction(StringPiece nameID) {
  275. std::unique_lock<std::mutex> l(mutex_);
  276. if (cancelFunctionWithLock(l, nameID)) {
  277. return true;
  278. }
  279. auto it = functionsMap_.find(nameID);
  280. if (it != functionsMap_.end() && it->second->isValid()) {
  281. cancelFunction(l, it->second);
  282. return true;
  283. }
  284. return false;
  285. }
  286. bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
  287. std::unique_lock<std::mutex> l(mutex_);
  288. if (cancelFunctionWithLock(l, nameID)) {
  289. runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
  290. return true;
  291. }
  292. auto it = functionsMap_.find(nameID);
  293. if (it != functionsMap_.end() && it->second->isValid()) {
  294. cancelFunction(l, it->second);
  295. return true;
  296. }
  297. return false;
  298. }
  299. void FunctionScheduler::cancelFunction(
  300. const std::unique_lock<std::mutex>& l,
  301. RepeatFunc* it) {
  302. // This function should only be called with mutex_ already locked.
  303. DCHECK(l.mutex() == &mutex_);
  304. DCHECK(l.owns_lock());
  305. functionsMap_.erase(it->name);
  306. it->cancel();
  307. }
  308. bool FunctionScheduler::cancelAllFunctionsWithLock(
  309. std::unique_lock<std::mutex>& lock) {
  310. CHECK_EQ(lock.owns_lock(), true);
  311. functions_.clear();
  312. functionsMap_.clear();
  313. if (currentFunction_) {
  314. cancellingCurrentFunction_ = true;
  315. }
  316. currentFunction_ = nullptr;
  317. return cancellingCurrentFunction_;
  318. }
  319. void FunctionScheduler::cancelAllFunctions() {
  320. std::unique_lock<std::mutex> l(mutex_);
  321. cancelAllFunctionsWithLock(l);
  322. }
  323. void FunctionScheduler::cancelAllFunctionsAndWait() {
  324. std::unique_lock<std::mutex> l(mutex_);
  325. if (cancelAllFunctionsWithLock(l)) {
  326. runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
  327. }
  328. }
  329. bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
  330. std::unique_lock<std::mutex> l(mutex_);
  331. if (currentFunction_ && currentFunction_->name == nameID) {
  332. if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
  333. return false;
  334. }
  335. currentFunction_->resetNextRunTime(steady_clock::now());
  336. return true;
  337. }
  338. // Since __adjust_heap() isn't a part of the standard API, there's no way to
  339. // fix the heap ordering if we adjust the key (nextRunTime) for the existing
  340. // RepeatFunc. Instead, we just cancel it and add an identical object.
  341. auto it = functionsMap_.find(nameID);
  342. if (it != functionsMap_.end() && it->second->isValid()) {
  343. if (running_) {
  344. it->second->resetNextRunTime(steady_clock::now());
  345. std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
  346. runningCondvar_.notify_one();
  347. }
  348. return true;
  349. }
  350. return false;
  351. }
  352. bool FunctionScheduler::start() {
  353. std::unique_lock<std::mutex> l(mutex_);
  354. if (running_) {
  355. return false;
  356. }
  357. VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
  358. << " functions.";
  359. auto now = steady_clock::now();
  360. // Reset the next run time. for all functions.
  361. // note: this is needed since one can shutdown() and start() again
  362. for (const auto& f : functions_) {
  363. f->resetNextRunTime(now);
  364. VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
  365. << ", period = " << f->intervalDescr
  366. << ", delay = " << f->startDelay.count() << "ms";
  367. }
  368. std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
  369. thread_ = std::thread([&] { this->run(); });
  370. running_ = true;
  371. return true;
  372. }
  373. bool FunctionScheduler::shutdown() {
  374. {
  375. std::lock_guard<std::mutex> g(mutex_);
  376. if (!running_) {
  377. return false;
  378. }
  379. running_ = false;
  380. runningCondvar_.notify_one();
  381. }
  382. thread_.join();
  383. return true;
  384. }
  385. void FunctionScheduler::run() {
  386. std::unique_lock<std::mutex> lock(mutex_);
  387. if (!threadName_.empty()) {
  388. folly::setThreadName(threadName_);
  389. }
  390. while (running_) {
  391. // If we have nothing to run, wait until a function is added or until we
  392. // are stopped.
  393. if (functions_.empty()) {
  394. runningCondvar_.wait(lock);
  395. continue;
  396. }
  397. auto now = steady_clock::now();
  398. // Move the next function to run to the end of functions_
  399. std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
  400. // Check to see if the function was cancelled.
  401. // If so, just remove it and continue around the loop.
  402. if (!functions_.back()->isValid()) {
  403. functions_.pop_back();
  404. continue;
  405. }
  406. auto sleepTime = functions_.back()->getNextRunTime() - now;
  407. if (sleepTime < milliseconds::zero()) {
  408. // We need to run this function now
  409. runOneFunction(lock, now);
  410. runningCondvar_.notify_all();
  411. } else {
  412. // Re-add the function to the heap, and wait until we actually
  413. // need to run it.
  414. std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
  415. runningCondvar_.wait_for(lock, sleepTime);
  416. }
  417. }
  418. }
  419. void FunctionScheduler::runOneFunction(
  420. std::unique_lock<std::mutex>& lock,
  421. steady_clock::time_point now) {
  422. DCHECK(lock.mutex() == &mutex_);
  423. DCHECK(lock.owns_lock());
  424. // The function to run will be at the end of functions_ already.
  425. //
  426. // Fully remove it from functions_ now.
  427. // We need to release mutex_ while we invoke this function, and we need to
  428. // maintain the heap property on functions_ while mutex_ is unlocked.
  429. auto func = std::move(functions_.back());
  430. functions_.pop_back();
  431. if (!func->cb) {
  432. VLOG(5) << func->name << "function has been canceled while waiting";
  433. return;
  434. }
  435. currentFunction_ = func.get();
  436. // Update the function's next run time.
  437. if (steady_) {
  438. // This allows scheduler to catch up
  439. func->setNextRunTimeSteady();
  440. } else {
  441. // Note that we set nextRunTime based on the current time where we started
  442. // the function call, rather than the time when the function finishes.
  443. // This ensures that we call the function once every time interval, as
  444. // opposed to waiting time interval seconds between calls. (These can be
  445. // different if the function takes a significant amount of time to run.)
  446. func->setNextRunTimeStrict(now);
  447. }
  448. // Release the lock while we invoke the user's function
  449. lock.unlock();
  450. // Invoke the function
  451. try {
  452. VLOG(5) << "Now running " << func->name;
  453. func->cb();
  454. } catch (const std::exception& ex) {
  455. LOG(ERROR) << "Error running the scheduled function <" << func->name
  456. << ">: " << exceptionStr(ex);
  457. }
  458. // Re-acquire the lock
  459. lock.lock();
  460. if (!currentFunction_) {
  461. // The function was cancelled while we were running it.
  462. // We shouldn't reschedule it;
  463. cancellingCurrentFunction_ = false;
  464. return;
  465. }
  466. if (currentFunction_->runOnce) {
  467. // Don't reschedule if the function only needed to run once.
  468. functionsMap_.erase(currentFunction_->name);
  469. currentFunction_ = nullptr;
  470. return;
  471. }
  472. // Re-insert the function into our functions_ heap.
  473. // We only maintain the heap property while running_ is set. (running_ may
  474. // have been cleared while we were invoking the user's function.)
  475. functions_.push_back(std::move(func));
  476. // Clear currentFunction_
  477. currentFunction_ = nullptr;
  478. if (running_) {
  479. std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
  480. }
  481. }
  482. void FunctionScheduler::addFunctionToHeap(
  483. const std::unique_lock<std::mutex>& lock,
  484. std::unique_ptr<RepeatFunc> func) {
  485. // This function should only be called with mutex_ already locked.
  486. DCHECK(lock.mutex() == &mutex_);
  487. DCHECK(lock.owns_lock());
  488. functions_.push_back(std::move(func));
  489. functionsMap_[functions_.back()->name] = functions_.back().get();
  490. if (running_) {
  491. functions_.back()->resetNextRunTime(steady_clock::now());
  492. std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
  493. // Signal the running thread to wake up and see if it needs to change
  494. // its current scheduling decision.
  495. runningCondvar_.notify_one();
  496. }
  497. }
  498. void FunctionScheduler::setThreadName(StringPiece threadName) {
  499. std::unique_lock<std::mutex> l(mutex_);
  500. threadName_ = threadName.str();
  501. }
  502. } // namespace folly