FunctionSchedulerTest.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. /*
  2. * Copyright 2015-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <algorithm>
  17. #include <atomic>
  18. #include <cassert>
  19. #include <random>
  20. #include <boost/thread.hpp>
  21. #include <folly/Random.h>
  22. #include <folly/experimental/FunctionScheduler.h>
  23. #include <folly/portability/GTest.h>
  24. #include <folly/synchronization/Baton.h>
  25. #if defined(__linux__)
  26. #include <dlfcn.h>
  27. #endif
  28. using namespace folly;
  29. using std::atomic;
  30. using std::chrono::duration_cast;
  31. using std::chrono::microseconds;
  32. using std::chrono::milliseconds;
  33. using std::chrono::steady_clock;
  34. namespace {
  35. /*
  36. * Helper functions for controlling how long this test takes.
  37. *
  38. * Using larger intervals here will make the tests less flaky when run on
  39. * heavily loaded systems. However, this will also make the tests take longer
  40. * to run.
  41. */
  42. static const auto timeFactor = std::chrono::milliseconds(400);
  43. std::chrono::milliseconds testInterval(int n) {
  44. return n * timeFactor;
  45. }
  46. int getTicksWithinRange(int n, int min, int max) {
  47. assert(min <= max);
  48. n = std::max(min, n);
  49. n = std::min(max, n);
  50. return n;
  51. }
  52. void delay(float n) {
  53. microseconds usec(static_cast<microseconds::rep>(
  54. duration_cast<microseconds>(timeFactor).count() * n));
  55. usleep(usec.count());
  56. }
  57. } // namespace
  58. TEST(FunctionScheduler, StartAndShutdown) {
  59. FunctionScheduler fs;
  60. EXPECT_TRUE(fs.start());
  61. EXPECT_FALSE(fs.start());
  62. EXPECT_TRUE(fs.shutdown());
  63. EXPECT_FALSE(fs.shutdown());
  64. // start again
  65. EXPECT_TRUE(fs.start());
  66. EXPECT_FALSE(fs.start());
  67. EXPECT_TRUE(fs.shutdown());
  68. EXPECT_FALSE(fs.shutdown());
  69. }
  70. TEST(FunctionScheduler, SimpleAdd) {
  71. atomic<int> total{0};
  72. FunctionScheduler fs;
  73. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  74. fs.start();
  75. delay(1);
  76. EXPECT_EQ(2, total);
  77. fs.shutdown();
  78. delay(2);
  79. EXPECT_EQ(2, total);
  80. }
  81. TEST(FunctionScheduler, AddCancel) {
  82. atomic<int> total{0};
  83. FunctionScheduler fs;
  84. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  85. fs.start();
  86. delay(1);
  87. EXPECT_EQ(2, total);
  88. delay(2);
  89. EXPECT_EQ(4, total);
  90. EXPECT_TRUE(fs.cancelFunction("add2"));
  91. EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
  92. delay(2);
  93. EXPECT_EQ(4, total);
  94. fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
  95. delay(1);
  96. EXPECT_EQ(5, total);
  97. delay(2);
  98. EXPECT_EQ(6, total);
  99. fs.shutdown();
  100. }
  101. TEST(FunctionScheduler, AddCancel2) {
  102. atomic<int> total{0};
  103. FunctionScheduler fs;
  104. // Test adds and cancels while the scheduler is stopped
  105. EXPECT_FALSE(fs.cancelFunction("add2"));
  106. fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
  107. EXPECT_TRUE(fs.cancelFunction("add2"));
  108. EXPECT_FALSE(fs.cancelFunction("add2"));
  109. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  110. fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
  111. EXPECT_EQ(0, total);
  112. fs.start();
  113. delay(1);
  114. EXPECT_EQ(5, total);
  115. // Cancel add2 while the scheduler is running
  116. EXPECT_TRUE(fs.cancelFunction("add2"));
  117. EXPECT_FALSE(fs.cancelFunction("add2"));
  118. EXPECT_FALSE(fs.cancelFunction("bogus"));
  119. delay(3);
  120. EXPECT_EQ(8, total);
  121. EXPECT_TRUE(fs.cancelFunction("add3"));
  122. // Test a function that cancels itself
  123. atomic<int> selfCancelCount{0};
  124. fs.addFunction(
  125. [&] {
  126. ++selfCancelCount;
  127. if (selfCancelCount > 2) {
  128. fs.cancelFunction("selfCancel");
  129. }
  130. },
  131. testInterval(1),
  132. "selfCancel",
  133. testInterval(1));
  134. delay(4);
  135. EXPECT_EQ(3, selfCancelCount);
  136. EXPECT_FALSE(fs.cancelFunction("selfCancel"));
  137. // Test a function that schedules another function
  138. atomic<int> adderCount{0};
  139. int fn2Count = 0;
  140. auto fn2 = [&] { ++fn2Count; };
  141. auto fnAdder = [&] {
  142. ++adderCount;
  143. if (adderCount == 2) {
  144. fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
  145. }
  146. };
  147. fs.addFunction(fnAdder, testInterval(4), "adder");
  148. // t0: adder fires
  149. delay(1); // t1
  150. EXPECT_EQ(1, adderCount);
  151. EXPECT_EQ(0, fn2Count);
  152. // t4: adder fires, schedules fn2
  153. delay(4); // t5
  154. EXPECT_EQ(2, adderCount);
  155. EXPECT_EQ(0, fn2Count);
  156. // t6: fn2 fires
  157. delay(2); // t7
  158. EXPECT_EQ(2, adderCount);
  159. EXPECT_EQ(1, fn2Count);
  160. // t8: adder fires
  161. // t9: fn2 fires
  162. delay(3); // t10
  163. EXPECT_EQ(3, adderCount);
  164. EXPECT_EQ(2, fn2Count);
  165. EXPECT_TRUE(fs.cancelFunction("fn2"));
  166. EXPECT_TRUE(fs.cancelFunction("adder"));
  167. delay(5); // t10
  168. EXPECT_EQ(3, adderCount);
  169. EXPECT_EQ(2, fn2Count);
  170. EXPECT_EQ(8, total);
  171. EXPECT_EQ(3, selfCancelCount);
  172. }
  173. TEST(FunctionScheduler, AddMultiple) {
  174. atomic<int> total{0};
  175. FunctionScheduler fs;
  176. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  177. fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
  178. EXPECT_THROW(
  179. fs.addFunction([&] { total += 2; }, testInterval(2), "add2"),
  180. std::invalid_argument); // function name already exists
  181. fs.start();
  182. delay(1);
  183. EXPECT_EQ(5, total);
  184. delay(4);
  185. EXPECT_EQ(12, total);
  186. EXPECT_TRUE(fs.cancelFunction("add2"));
  187. delay(2);
  188. EXPECT_EQ(15, total);
  189. fs.shutdown();
  190. delay(3);
  191. EXPECT_EQ(15, total);
  192. fs.shutdown();
  193. }
  194. TEST(FunctionScheduler, AddAfterStart) {
  195. atomic<int> total{0};
  196. FunctionScheduler fs;
  197. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  198. fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
  199. fs.start();
  200. delay(3);
  201. EXPECT_EQ(10, total);
  202. fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
  203. delay(2);
  204. EXPECT_EQ(17, total);
  205. }
  206. TEST(FunctionScheduler, ShutdownStart) {
  207. atomic<int> total{0};
  208. FunctionScheduler fs;
  209. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  210. fs.start();
  211. delay(1);
  212. fs.shutdown();
  213. fs.start();
  214. delay(1);
  215. EXPECT_EQ(4, total);
  216. EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
  217. delay(2);
  218. EXPECT_EQ(6, total);
  219. }
  220. TEST(FunctionScheduler, ResetFunc) {
  221. atomic<int> total{0};
  222. FunctionScheduler fs;
  223. fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
  224. fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
  225. fs.start();
  226. delay(1);
  227. EXPECT_EQ(5, total);
  228. EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
  229. EXPECT_TRUE(fs.resetFunctionTimer("add2"));
  230. delay(1);
  231. // t2: after the reset, add2 should have been invoked immediately
  232. EXPECT_EQ(7, total);
  233. delay(1.5);
  234. // t3.5: add3 should have been invoked. add2 should not
  235. EXPECT_EQ(10, total);
  236. delay(1);
  237. // t4.5: add2 should have been invoked once more (it was reset at t1)
  238. EXPECT_EQ(12, total);
  239. }
  240. TEST(FunctionScheduler, ResetFunc2) {
  241. atomic<int> total{0};
  242. FunctionScheduler fs;
  243. fs.addFunctionOnce([&] { total += 2; }, "add2", testInterval(1));
  244. fs.addFunctionOnce([&] { total += 3; }, "add3", testInterval(1));
  245. fs.start();
  246. delay(2);
  247. fs.addFunctionOnce([&] { total += 3; }, "add4", testInterval(2));
  248. EXPECT_TRUE(fs.resetFunctionTimer("add4"));
  249. fs.addFunctionOnce([&] { total += 3; }, "add6", testInterval(2));
  250. delay(1);
  251. EXPECT_TRUE(fs.resetFunctionTimer("add4"));
  252. delay(3);
  253. EXPECT_FALSE(fs.resetFunctionTimer("add3"));
  254. fs.addFunctionOnce([&] { total += 3; }, "add4", testInterval(1));
  255. }
  256. TEST(FunctionScheduler, ResetFuncWhileRunning) {
  257. struct State {
  258. boost::barrier barrier_a{2};
  259. boost::barrier barrier_b{2};
  260. boost::barrier barrier_c{2};
  261. boost::barrier barrier_d{2};
  262. bool set = false;
  263. size_t count = 0;
  264. };
  265. State state; // held by ref
  266. auto mv = std::make_shared<size_t>(); // gets moved
  267. FunctionScheduler fs;
  268. fs.addFunction(
  269. [&, mv /* ref + shared_ptr fit in in-situ storage */] {
  270. if (!state.set) { // first invocation
  271. state.barrier_a.wait();
  272. // ensure that resetFunctionTimer is called in this critical section
  273. state.barrier_b.wait();
  274. ++state.count;
  275. EXPECT_TRUE(bool(mv)) << "bug repro: mv was moved-out";
  276. state.barrier_c.wait();
  277. // main thread checks count here
  278. state.barrier_d.wait();
  279. } else { // subsequent invocations
  280. ++state.count;
  281. }
  282. },
  283. testInterval(3),
  284. "nada");
  285. fs.start();
  286. state.barrier_a.wait();
  287. state.set = true;
  288. fs.resetFunctionTimer("nada");
  289. EXPECT_EQ(0, state.count) << "sanity check";
  290. state.barrier_b.wait();
  291. // fn thread increments count and checks mv here
  292. state.barrier_c.wait();
  293. EXPECT_EQ(1, state.count) << "sanity check";
  294. state.barrier_d.wait();
  295. delay(1);
  296. EXPECT_EQ(2, state.count) << "sanity check";
  297. }
  298. TEST(FunctionScheduler, AddInvalid) {
  299. atomic<int> total{0};
  300. FunctionScheduler fs;
  301. // interval may not be negative
  302. EXPECT_THROW(
  303. fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
  304. std::invalid_argument);
  305. EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
  306. }
  307. TEST(FunctionScheduler, NoFunctions) {
  308. FunctionScheduler fs;
  309. EXPECT_TRUE(fs.start());
  310. fs.shutdown();
  311. FunctionScheduler fs2;
  312. fs2.shutdown();
  313. }
  314. TEST(FunctionScheduler, AddWhileRunning) {
  315. atomic<int> total{0};
  316. FunctionScheduler fs;
  317. fs.start();
  318. delay(1);
  319. fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
  320. // The function should be invoked nearly immediately when we add it
  321. // and the FunctionScheduler is already running
  322. delay(0.5);
  323. auto t = total.load();
  324. EXPECT_EQ(2, t);
  325. delay(2);
  326. t = total.load();
  327. EXPECT_EQ(4, t);
  328. }
  329. TEST(FunctionScheduler, NoShutdown) {
  330. atomic<int> total{0};
  331. {
  332. FunctionScheduler fs;
  333. fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
  334. fs.start();
  335. delay(0.5);
  336. EXPECT_EQ(2, total);
  337. }
  338. // Destroyed the FunctionScheduler without calling shutdown.
  339. // Everything should have been cleaned up, and the function will no longer
  340. // get called.
  341. delay(2);
  342. EXPECT_EQ(2, total);
  343. }
  344. TEST(FunctionScheduler, StartDelay) {
  345. atomic<int> total{0};
  346. FunctionScheduler fs;
  347. fs.addFunction([&] { total += 2; }, testInterval(2), "add2", testInterval(2));
  348. fs.addFunction([&] { total += 3; }, testInterval(3), "add3", testInterval(2));
  349. EXPECT_THROW(
  350. fs.addFunction(
  351. [&] { total += 2; }, testInterval(3), "addX", testInterval(-1)),
  352. std::invalid_argument);
  353. fs.start();
  354. delay(1); // t1
  355. EXPECT_EQ(0, total);
  356. // t2 : add2 total=2
  357. // t2 : add3 total=5
  358. delay(2); // t3
  359. EXPECT_EQ(5, total);
  360. // t4 : add2: total=7
  361. // t5 : add3: total=10
  362. // t6 : add2: total=12
  363. delay(4); // t7
  364. EXPECT_EQ(12, total);
  365. fs.cancelFunction("add2");
  366. // t8 : add3: total=15
  367. delay(2); // t9
  368. EXPECT_EQ(15, total);
  369. fs.shutdown();
  370. delay(3);
  371. EXPECT_EQ(15, total);
  372. fs.shutdown();
  373. }
  374. TEST(FunctionScheduler, NoSteadyCatchup) {
  375. std::atomic<int> ticks(0);
  376. FunctionScheduler fs;
  377. // fs.setSteady(false); is the default
  378. fs.addFunction(
  379. [&ticks] {
  380. if (++ticks == 2) {
  381. std::this_thread::sleep_for(std::chrono::milliseconds(200));
  382. }
  383. },
  384. milliseconds(5));
  385. fs.start();
  386. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  387. // no steady catch up means we'd tick once for 200ms, then remaining
  388. // 300ms / 5 = 60 times
  389. EXPECT_LE(ticks.load(), 61);
  390. }
  391. TEST(FunctionScheduler, SteadyCatchup) {
  392. std::atomic<int> ticks(0);
  393. FunctionScheduler fs;
  394. fs.setSteady(true);
  395. fs.addFunction(
  396. [&ticks] {
  397. if (++ticks == 2) {
  398. std::this_thread::sleep_for(std::chrono::milliseconds(200));
  399. }
  400. },
  401. milliseconds(5));
  402. fs.start();
  403. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  404. // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
  405. // enough to catch back up to schedule
  406. EXPECT_NEAR(100, ticks.load(), 10);
  407. }
  408. TEST(FunctionScheduler, UniformDistribution) {
  409. atomic<int> total{0};
  410. const int kTicks = 2;
  411. std::chrono::milliseconds minInterval =
  412. testInterval(kTicks) - (timeFactor / 5);
  413. std::chrono::milliseconds maxInterval =
  414. testInterval(kTicks) + (timeFactor / 5);
  415. FunctionScheduler fs;
  416. fs.addFunctionUniformDistribution(
  417. [&] { total += 2; },
  418. minInterval,
  419. maxInterval,
  420. "UniformDistribution",
  421. std::chrono::milliseconds(0));
  422. fs.start();
  423. delay(1);
  424. EXPECT_EQ(2, total);
  425. delay(kTicks);
  426. EXPECT_EQ(4, total);
  427. delay(kTicks);
  428. EXPECT_EQ(6, total);
  429. fs.shutdown();
  430. delay(2);
  431. EXPECT_EQ(6, total);
  432. }
  433. TEST(FunctionScheduler, ConsistentDelay) {
  434. std::atomic<int> ticks(0);
  435. FunctionScheduler fs;
  436. std::atomic<long long> epoch(0);
  437. epoch = duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
  438. .count();
  439. // We should have runs at t = 0, 600, 800, 1200, or 4 total.
  440. // If at const interval, it would be t = 0, 600, 1000, or 3 total.
  441. fs.addFunctionConsistentDelay(
  442. [&ticks, &epoch] {
  443. auto now =
  444. duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
  445. .count();
  446. int t = ++ticks;
  447. if (t != 2) {
  448. // Sensitive to delays above 100ms.
  449. EXPECT_NEAR((now - epoch) - (t - 1) * 400, 0, 100);
  450. }
  451. if (t == 1) {
  452. /* sleep override */
  453. std::this_thread::sleep_for(std::chrono::milliseconds(600));
  454. }
  455. },
  456. milliseconds(400),
  457. "ConsistentDelay");
  458. fs.start();
  459. /* sleep override */
  460. std::this_thread::sleep_for(std::chrono::milliseconds(1300));
  461. EXPECT_EQ(ticks.load(), 4);
  462. }
  463. TEST(FunctionScheduler, ExponentialBackoff) {
  464. atomic<int> total{0};
  465. atomic<int> expectedInterval{0};
  466. atomic<int> nextInterval{2};
  467. FunctionScheduler fs;
  468. fs.addFunctionGenericDistribution(
  469. [&] { total += 2; },
  470. [&expectedInterval, &nextInterval]() mutable {
  471. auto interval = nextInterval.load();
  472. expectedInterval = interval;
  473. nextInterval = interval * interval;
  474. return testInterval(interval);
  475. },
  476. "ExponentialBackoff",
  477. "2^n * 100ms",
  478. std::chrono::milliseconds(0));
  479. fs.start();
  480. delay(1);
  481. EXPECT_EQ(2, total);
  482. delay(expectedInterval);
  483. EXPECT_EQ(4, total);
  484. delay(expectedInterval);
  485. EXPECT_EQ(6, total);
  486. fs.shutdown();
  487. delay(2);
  488. EXPECT_EQ(6, total);
  489. }
  490. TEST(FunctionScheduler, GammaIntervalDistribution) {
  491. atomic<int> total{0};
  492. atomic<int> expectedInterval{0};
  493. FunctionScheduler fs;
  494. std::default_random_engine generator(folly::Random::rand32());
  495. // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
  496. // These values do not matter much in this test, as we are not testing the
  497. // std::gamma_distribution itself...
  498. std::gamma_distribution<double> gamma(2.0, 2.0);
  499. fs.addFunctionGenericDistribution(
  500. [&] { total += 2; },
  501. [&expectedInterval, generator, gamma]() mutable {
  502. expectedInterval =
  503. getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
  504. return testInterval(expectedInterval);
  505. },
  506. "GammaDistribution",
  507. "gamma(2.0,2.0)*100ms",
  508. std::chrono::milliseconds(0));
  509. fs.start();
  510. delay(1);
  511. EXPECT_EQ(2, total);
  512. delay(expectedInterval);
  513. EXPECT_EQ(4, total);
  514. delay(expectedInterval);
  515. EXPECT_EQ(6, total);
  516. fs.shutdown();
  517. delay(2);
  518. EXPECT_EQ(6, total);
  519. }
  520. TEST(FunctionScheduler, AddWithRunOnce) {
  521. atomic<int> total{0};
  522. FunctionScheduler fs;
  523. fs.addFunctionOnce([&] { total += 2; }, "add2");
  524. fs.start();
  525. delay(1);
  526. EXPECT_EQ(2, total);
  527. delay(2);
  528. EXPECT_EQ(2, total);
  529. fs.addFunctionOnce([&] { total += 2; }, "add2");
  530. delay(1);
  531. EXPECT_EQ(4, total);
  532. delay(2);
  533. EXPECT_EQ(4, total);
  534. fs.shutdown();
  535. }
  536. TEST(FunctionScheduler, cancelFunctionAndWait) {
  537. atomic<int> total{0};
  538. FunctionScheduler fs;
  539. fs.addFunction(
  540. [&] {
  541. delay(5);
  542. total += 2;
  543. },
  544. testInterval(100),
  545. "add2");
  546. fs.start();
  547. delay(1);
  548. EXPECT_EQ(0, total); // add2 is still sleeping
  549. EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
  550. EXPECT_EQ(2, total); // add2 should have completed
  551. EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
  552. fs.shutdown();
  553. }
  554. #if defined(__linux__)
  555. namespace {
  556. /**
  557. * A helper class that forces our pthread_create() wrapper to fail when
  558. * an PThreadCreateFailure object exists.
  559. */
  560. class PThreadCreateFailure {
  561. public:
  562. PThreadCreateFailure() {
  563. ++forceFailure_;
  564. }
  565. ~PThreadCreateFailure() {
  566. --forceFailure_;
  567. }
  568. static bool shouldFail() {
  569. return forceFailure_ > 0;
  570. }
  571. private:
  572. static std::atomic<int> forceFailure_;
  573. };
  574. std::atomic<int> PThreadCreateFailure::forceFailure_{0};
  575. } // namespace
  576. // Replace the system pthread_create() function with our own stub, so we can
  577. // trigger failures in the StartThrows() test.
  578. extern "C" int pthread_create(
  579. pthread_t* thread,
  580. const pthread_attr_t* attr,
  581. void* (*start_routine)(void*),
  582. void* arg) {
  583. static const auto realFunction = reinterpret_cast<decltype(&pthread_create)>(
  584. dlsym(RTLD_NEXT, "pthread_create"));
  585. // For sanity, make sure we didn't find ourself,
  586. // since that would cause infinite recursion.
  587. CHECK_NE(realFunction, pthread_create);
  588. if (PThreadCreateFailure::shouldFail()) {
  589. errno = EINVAL;
  590. return -1;
  591. }
  592. return realFunction(thread, attr, start_routine, arg);
  593. }
  594. TEST(FunctionScheduler, StartThrows) {
  595. FunctionScheduler fs;
  596. PThreadCreateFailure fail;
  597. EXPECT_ANY_THROW(fs.start());
  598. EXPECT_NO_THROW(fs.shutdown());
  599. }
  600. #endif
  601. TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
  602. atomic<int> total{0};
  603. FunctionScheduler fs;
  604. fs.addFunction(
  605. [&] {
  606. delay(5);
  607. total += 2;
  608. },
  609. testInterval(100),
  610. "add2");
  611. fs.start();
  612. delay(1);
  613. EXPECT_EQ(0, total); // add2 is still sleeping
  614. fs.cancelAllFunctionsAndWait();
  615. EXPECT_EQ(2, total);
  616. EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
  617. fs.shutdown();
  618. }
  619. TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
  620. folly::Baton<> baton;
  621. std::thread th([&baton]() {
  622. FunctionScheduler fs;
  623. fs.addFunction([] { delay(10); }, testInterval(2), "func");
  624. fs.start();
  625. delay(1);
  626. EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
  627. baton.post();
  628. });
  629. ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
  630. th.join();
  631. }
  632. TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
  633. folly::Baton<> baton;
  634. std::thread th([&baton]() {
  635. FunctionScheduler fs;
  636. fs.addFunction([] { delay(10); }, testInterval(2), "func");
  637. fs.start();
  638. delay(1);
  639. fs.cancelAllFunctionsAndWait();
  640. baton.post();
  641. });
  642. ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
  643. th.join();
  644. }
  645. TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
  646. folly::Baton<> baton;
  647. std::thread th([&baton]() {
  648. std::atomic<int> nExecuted(0);
  649. FunctionScheduler fs;
  650. fs.addFunction(
  651. [&nExecuted] {
  652. nExecuted++;
  653. delay(10);
  654. },
  655. testInterval(2),
  656. "func0");
  657. fs.addFunction(
  658. [&nExecuted] {
  659. nExecuted++;
  660. delay(10);
  661. },
  662. testInterval(2),
  663. "func1",
  664. testInterval(5));
  665. fs.start();
  666. delay(1);
  667. fs.cancelAllFunctionsAndWait();
  668. EXPECT_EQ(nExecuted, 1);
  669. baton.post();
  670. });
  671. ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
  672. th.join();
  673. }
  674. TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
  675. FunctionScheduler fs;
  676. fs.addFunction([] { delay(10); }, testInterval(2), "func");
  677. fs.start();
  678. delay(1);
  679. std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
  680. delay(1);
  681. std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
  682. th1.join();
  683. th2.join();
  684. }