MPMCQueueTest.cpp 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240
  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/MPMCQueue.h>
  17. #include <folly/Format.h>
  18. #include <folly/Memory.h>
  19. #include <folly/portability/GTest.h>
  20. #include <folly/portability/SysResource.h>
  21. #include <folly/portability/SysTime.h>
  22. #include <folly/portability/Unistd.h>
  23. #include <folly/stop_watch.h>
  24. #include <folly/test/DeterministicSchedule.h>
  25. #include <boost/intrusive_ptr.hpp>
  26. #include <boost/thread/barrier.hpp>
  27. #include <functional>
  28. #include <memory>
  29. #include <thread>
  30. #include <utility>
  31. FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(boost::intrusive_ptr)
  32. using namespace folly;
  33. using namespace detail;
  34. using namespace test;
  35. using std::string;
  36. using std::unique_ptr;
  37. using std::vector;
  38. using std::chrono::milliseconds;
  39. using std::chrono::seconds;
  40. using std::chrono::steady_clock;
  41. using std::chrono::time_point;
  42. typedef DeterministicSchedule DSched;
  43. template <template <typename> class Atom>
  44. void run_mt_sequencer_thread(
  45. int numThreads,
  46. int numOps,
  47. uint32_t init,
  48. TurnSequencer<Atom>& seq,
  49. Atom<uint32_t>& spinThreshold,
  50. int& prev,
  51. int i) {
  52. for (int op = i; op < numOps; op += numThreads) {
  53. seq.waitForTurn(init + op, spinThreshold, (op % 32) == 0);
  54. EXPECT_EQ(prev, op - 1);
  55. prev = op;
  56. seq.completeTurn(init + op);
  57. }
  58. }
  59. template <template <typename> class Atom>
  60. void run_mt_sequencer_test(int numThreads, int numOps, uint32_t init) {
  61. TurnSequencer<Atom> seq(init);
  62. Atom<uint32_t> spinThreshold(0);
  63. int prev = -1;
  64. vector<std::thread> threads(numThreads);
  65. for (int i = 0; i < numThreads; ++i) {
  66. threads[i] = DSched::thread(std::bind(
  67. run_mt_sequencer_thread<Atom>,
  68. numThreads,
  69. numOps,
  70. init,
  71. std::ref(seq),
  72. std::ref(spinThreshold),
  73. std::ref(prev),
  74. i));
  75. }
  76. for (auto& thr : threads) {
  77. DSched::join(thr);
  78. }
  79. EXPECT_EQ(prev, numOps - 1);
  80. }
  81. TEST(MPMCQueue, sequencer) {
  82. run_mt_sequencer_test<std::atomic>(1, 100, 0);
  83. run_mt_sequencer_test<std::atomic>(2, 100000, -100);
  84. run_mt_sequencer_test<std::atomic>(100, 10000, -100);
  85. }
  86. TEST(MPMCQueue, sequencer_emulated_futex) {
  87. run_mt_sequencer_test<EmulatedFutexAtomic>(1, 100, 0);
  88. run_mt_sequencer_test<EmulatedFutexAtomic>(2, 100000, -100);
  89. run_mt_sequencer_test<EmulatedFutexAtomic>(100, 10000, -100);
  90. }
  91. TEST(MPMCQueue, sequencer_deterministic) {
  92. DSched sched(DSched::uniform(0));
  93. run_mt_sequencer_test<DeterministicAtomic>(1, 100, -50);
  94. run_mt_sequencer_test<DeterministicAtomic>(2, 10000, (1 << 29) - 100);
  95. run_mt_sequencer_test<DeterministicAtomic>(10, 1000, -100);
  96. }
  97. template <bool Dynamic = false, typename T>
  98. void runElementTypeTest(T&& src) {
  99. MPMCQueue<T, std::atomic, Dynamic> cq(10);
  100. cq.blockingWrite(std::forward<T>(src));
  101. T dest;
  102. cq.blockingRead(dest);
  103. EXPECT_TRUE(cq.write(std::move(dest)));
  104. EXPECT_TRUE(cq.read(dest));
  105. auto soon1 = std::chrono::system_clock::now() + std::chrono::seconds(1);
  106. EXPECT_TRUE(cq.tryWriteUntil(soon1, std::move(dest)));
  107. EXPECT_TRUE(cq.read(dest));
  108. auto soon2 = std::chrono::steady_clock::now() + std::chrono::seconds(1);
  109. EXPECT_TRUE(cq.tryWriteUntil(soon2, std::move(dest)));
  110. EXPECT_TRUE(cq.read(dest));
  111. }
  112. struct RefCounted {
  113. static FOLLY_TLS int active_instances;
  114. mutable std::atomic<int> rc;
  115. RefCounted() : rc(0) {
  116. ++active_instances;
  117. }
  118. ~RefCounted() {
  119. --active_instances;
  120. }
  121. };
  122. FOLLY_TLS int RefCounted::active_instances;
  123. void intrusive_ptr_add_ref(RefCounted const* p) {
  124. p->rc++;
  125. }
  126. void intrusive_ptr_release(RefCounted const* p) {
  127. if (--(p->rc) == 0) {
  128. delete p;
  129. }
  130. }
  131. TEST(MPMCQueue, lots_of_element_types) {
  132. runElementTypeTest(10);
  133. runElementTypeTest(string("abc"));
  134. runElementTypeTest(std::make_pair(10, string("def")));
  135. runElementTypeTest(vector<string>{{"abc"}});
  136. runElementTypeTest(std::make_shared<char>('a'));
  137. runElementTypeTest(std::make_unique<char>('a'));
  138. runElementTypeTest(boost::intrusive_ptr<RefCounted>(new RefCounted));
  139. EXPECT_EQ(RefCounted::active_instances, 0);
  140. }
  141. TEST(MPMCQueue, lots_of_element_types_dynamic) {
  142. runElementTypeTest<true>(10);
  143. runElementTypeTest<true>(string("abc"));
  144. runElementTypeTest<true>(std::make_pair(10, string("def")));
  145. runElementTypeTest<true>(vector<string>{{"abc"}});
  146. runElementTypeTest<true>(std::make_shared<char>('a'));
  147. runElementTypeTest<true>(std::make_unique<char>('a'));
  148. runElementTypeTest<true>(boost::intrusive_ptr<RefCounted>(new RefCounted));
  149. EXPECT_EQ(RefCounted::active_instances, 0);
  150. }
  151. TEST(MPMCQueue, single_thread_enqdeq) {
  152. // Non-dynamic version only.
  153. // False positive for dynamic version. Capacity can be temporarily
  154. // higher than specified.
  155. MPMCQueue<int> cq(10);
  156. for (int pass = 0; pass < 10; ++pass) {
  157. for (int i = 0; i < 10; ++i) {
  158. EXPECT_TRUE(cq.write(i));
  159. }
  160. EXPECT_FALSE(cq.write(-1));
  161. EXPECT_FALSE(cq.isEmpty());
  162. EXPECT_EQ(cq.size(), 10);
  163. for (int i = 0; i < 5; ++i) {
  164. int dest = -1;
  165. EXPECT_TRUE(cq.read(dest));
  166. EXPECT_EQ(dest, i);
  167. }
  168. for (int i = 5; i < 10; ++i) {
  169. int dest = -1;
  170. cq.blockingRead(dest);
  171. EXPECT_EQ(dest, i);
  172. }
  173. int dest = -1;
  174. EXPECT_FALSE(cq.read(dest));
  175. EXPECT_EQ(dest, -1);
  176. EXPECT_TRUE(cq.isEmpty());
  177. EXPECT_EQ(cq.size(), 0);
  178. }
  179. }
  180. TEST(MPMCQueue, tryenq_capacity_test) {
  181. // Non-dynamic version only.
  182. // False positive for dynamic version. Capacity can be temporarily
  183. // higher than specified.
  184. for (size_t cap = 1; cap < 100; ++cap) {
  185. MPMCQueue<int> cq(cap);
  186. for (size_t i = 0; i < cap; ++i) {
  187. EXPECT_TRUE(cq.write(i));
  188. }
  189. EXPECT_FALSE(cq.write(100));
  190. }
  191. }
  192. TEST(MPMCQueue, enq_capacity_test) {
  193. // Non-dynamic version only.
  194. // False positive for dynamic version. Capacity can be temporarily
  195. // higher than specified.
  196. for (auto cap : {1, 100, 10000}) {
  197. MPMCQueue<int> cq(cap);
  198. for (int i = 0; i < cap; ++i) {
  199. cq.blockingWrite(i);
  200. }
  201. int t = 0;
  202. int when;
  203. auto thr = std::thread([&] {
  204. cq.blockingWrite(100);
  205. when = t;
  206. });
  207. usleep(2000);
  208. t = 1;
  209. int dummy;
  210. cq.blockingRead(dummy);
  211. thr.join();
  212. EXPECT_EQ(when, 1);
  213. }
  214. }
  215. template <template <typename> class Atom, bool Dynamic = false>
  216. void runTryEnqDeqThread(
  217. int numThreads,
  218. int n, /*numOps*/
  219. MPMCQueue<int, Atom, Dynamic>& cq,
  220. std::atomic<uint64_t>& sum,
  221. int t) {
  222. uint64_t threadSum = 0;
  223. int src = t;
  224. // received doesn't reflect any actual values, we just start with
  225. // t and increment by numThreads to get the rounding of termination
  226. // correct if numThreads doesn't evenly divide numOps
  227. int received = t;
  228. while (src < n || received < n) {
  229. if (src < n && cq.write(src)) {
  230. src += numThreads;
  231. }
  232. int dst;
  233. if (received < n && cq.read(dst)) {
  234. received += numThreads;
  235. threadSum += dst;
  236. }
  237. }
  238. sum += threadSum;
  239. }
  240. template <template <typename> class Atom, bool Dynamic = false>
  241. void runTryEnqDeqTest(int numThreads, int numOps) {
  242. // write and read aren't linearizable, so we don't have
  243. // hard guarantees on their individual behavior. We can still test
  244. // correctness in aggregate
  245. MPMCQueue<int, Atom, Dynamic> cq(numThreads);
  246. uint64_t n = numOps;
  247. vector<std::thread> threads(numThreads);
  248. std::atomic<uint64_t> sum(0);
  249. for (int t = 0; t < numThreads; ++t) {
  250. threads[t] = DSched::thread(std::bind(
  251. runTryEnqDeqThread<Atom, Dynamic>,
  252. numThreads,
  253. n,
  254. std::ref(cq),
  255. std::ref(sum),
  256. t));
  257. }
  258. for (auto& t : threads) {
  259. DSched::join(t);
  260. }
  261. EXPECT_TRUE(cq.isEmpty());
  262. EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
  263. }
  264. TEST(MPMCQueue, mt_try_enq_deq) {
  265. int nts[] = {1, 3, 100};
  266. int n = 100000;
  267. for (int nt : nts) {
  268. runTryEnqDeqTest<std::atomic>(nt, n);
  269. }
  270. }
  271. TEST(MPMCQueue, mt_try_enq_deq_dynamic) {
  272. int nts[] = {1, 3, 100};
  273. int n = 100000;
  274. for (int nt : nts) {
  275. runTryEnqDeqTest<std::atomic, /* Dynamic = */ true>(nt, n);
  276. }
  277. }
  278. TEST(MPMCQueue, mt_try_enq_deq_emulated_futex) {
  279. int nts[] = {1, 3, 100};
  280. int n = 100000;
  281. for (int nt : nts) {
  282. runTryEnqDeqTest<EmulatedFutexAtomic>(nt, n);
  283. }
  284. }
  285. TEST(MPMCQueue, mt_try_enq_deq_emulated_futex_dynamic) {
  286. int nts[] = {1, 3, 100};
  287. int n = 100000;
  288. for (int nt : nts) {
  289. runTryEnqDeqTest<EmulatedFutexAtomic, /* Dynamic = */ true>(nt, n);
  290. }
  291. }
  292. TEST(MPMCQueue, mt_try_enq_deq_deterministic) {
  293. int nts[] = {3, 10};
  294. long seed = 0;
  295. LOG(INFO) << "using seed " << seed;
  296. int n = 1000;
  297. for (int nt : nts) {
  298. {
  299. DSched sched(DSched::uniform(seed));
  300. runTryEnqDeqTest<DeterministicAtomic>(nt, n);
  301. }
  302. {
  303. DSched sched(DSched::uniformSubset(seed, 2));
  304. runTryEnqDeqTest<DeterministicAtomic>(nt, n);
  305. }
  306. {
  307. DSched sched(DSched::uniform(seed));
  308. runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
  309. }
  310. {
  311. DSched sched(DSched::uniformSubset(seed, 2));
  312. runTryEnqDeqTest<DeterministicAtomic, /*Dynamic = */ true>(nt, n);
  313. }
  314. }
  315. }
  316. uint64_t nowMicro() {
  317. timeval tv;
  318. gettimeofday(&tv, nullptr);
  319. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  320. }
  321. template <typename Q>
  322. struct WriteMethodCaller {
  323. WriteMethodCaller() {}
  324. virtual ~WriteMethodCaller() = default;
  325. virtual bool callWrite(Q& q, int i) = 0;
  326. virtual string methodName() = 0;
  327. };
  328. template <typename Q>
  329. struct BlockingWriteCaller : public WriteMethodCaller<Q> {
  330. bool callWrite(Q& q, int i) override {
  331. q.blockingWrite(i);
  332. return true;
  333. }
  334. string methodName() override {
  335. return "blockingWrite";
  336. }
  337. };
  338. template <typename Q>
  339. struct WriteIfNotFullCaller : public WriteMethodCaller<Q> {
  340. bool callWrite(Q& q, int i) override {
  341. return q.writeIfNotFull(i);
  342. }
  343. string methodName() override {
  344. return "writeIfNotFull";
  345. }
  346. };
  347. template <typename Q>
  348. struct WriteCaller : public WriteMethodCaller<Q> {
  349. bool callWrite(Q& q, int i) override {
  350. return q.write(i);
  351. }
  352. string methodName() override {
  353. return "write";
  354. }
  355. };
  356. template <
  357. typename Q,
  358. class Clock = steady_clock,
  359. class Duration = typename Clock::duration>
  360. struct TryWriteUntilCaller : public WriteMethodCaller<Q> {
  361. const Duration duration_;
  362. explicit TryWriteUntilCaller(Duration&& duration) : duration_(duration) {}
  363. bool callWrite(Q& q, int i) override {
  364. auto then = Clock::now() + duration_;
  365. return q.tryWriteUntil(then, i);
  366. }
  367. string methodName() override {
  368. return folly::sformat(
  369. "tryWriteUntil({}ms)",
  370. std::chrono::duration_cast<milliseconds>(duration_).count());
  371. }
  372. };
  373. template <typename Q>
  374. string producerConsumerBench(
  375. Q&& queue,
  376. string qName,
  377. int numProducers,
  378. int numConsumers,
  379. int numOps,
  380. WriteMethodCaller<Q>& writer,
  381. bool ignoreContents = false) {
  382. Q& q = queue;
  383. struct rusage beginUsage;
  384. getrusage(RUSAGE_SELF, &beginUsage);
  385. auto beginMicro = nowMicro();
  386. uint64_t n = numOps;
  387. std::atomic<uint64_t> sum(0);
  388. std::atomic<uint64_t> failed(0);
  389. vector<std::thread> producers(numProducers);
  390. for (int t = 0; t < numProducers; ++t) {
  391. producers[t] = DSched::thread([&, t] {
  392. for (int i = t; i < numOps; i += numProducers) {
  393. while (!writer.callWrite(q, i)) {
  394. ++failed;
  395. }
  396. }
  397. });
  398. }
  399. vector<std::thread> consumers(numConsumers);
  400. for (int t = 0; t < numConsumers; ++t) {
  401. consumers[t] = DSched::thread([&, t] {
  402. uint64_t localSum = 0;
  403. for (int i = t; i < numOps; i += numConsumers) {
  404. int dest = -1;
  405. q.blockingRead(dest);
  406. EXPECT_FALSE(dest == -1);
  407. localSum += dest;
  408. }
  409. sum += localSum;
  410. });
  411. }
  412. for (auto& t : producers) {
  413. DSched::join(t);
  414. }
  415. for (auto& t : consumers) {
  416. DSched::join(t);
  417. }
  418. if (!ignoreContents) {
  419. EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
  420. }
  421. auto endMicro = nowMicro();
  422. struct rusage endUsage;
  423. getrusage(RUSAGE_SELF, &endUsage);
  424. uint64_t nanosPer = (1000 * (endMicro - beginMicro)) / n;
  425. long csw = endUsage.ru_nvcsw + endUsage.ru_nivcsw -
  426. (beginUsage.ru_nvcsw + beginUsage.ru_nivcsw);
  427. uint64_t failures = failed;
  428. size_t allocated = q.allocatedCapacity();
  429. return folly::sformat(
  430. "{}, {} {} producers, {} consumers => {} nanos/handoff, {} csw / {} "
  431. "handoff, {} failures, {} allocated",
  432. qName,
  433. numProducers,
  434. writer.methodName(),
  435. numConsumers,
  436. nanosPer,
  437. csw,
  438. n,
  439. failures,
  440. allocated);
  441. }
  442. template <bool Dynamic = false>
  443. void runMtProdConsDeterministic(long seed) {
  444. // we use the Bench method, but perf results are meaningless under DSched
  445. DSched sched(DSched::uniform(seed));
  446. using QueueType = MPMCQueue<int, DeterministicAtomic, Dynamic>;
  447. vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  448. callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  449. callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  450. callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  451. callers.emplace_back(
  452. std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  453. callers.emplace_back(
  454. std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  455. size_t cap;
  456. for (const auto& caller : callers) {
  457. cap = 10;
  458. LOG(INFO) << producerConsumerBench(
  459. MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
  460. "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
  461. folly::to<std::string>(cap) + ")",
  462. 1,
  463. 1,
  464. 1000,
  465. *caller);
  466. cap = 100;
  467. LOG(INFO) << producerConsumerBench(
  468. MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
  469. "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
  470. folly::to<std::string>(cap) + ")",
  471. 10,
  472. 10,
  473. 1000,
  474. *caller);
  475. cap = 10;
  476. LOG(INFO) << producerConsumerBench(
  477. MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
  478. "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
  479. folly::to<std::string>(cap) + ")",
  480. 1,
  481. 1,
  482. 1000,
  483. *caller);
  484. cap = 100;
  485. LOG(INFO) << producerConsumerBench(
  486. MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
  487. "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
  488. folly::to<std::string>(cap) + ")",
  489. 10,
  490. 10,
  491. 1000,
  492. *caller);
  493. cap = 1;
  494. LOG(INFO) << producerConsumerBench(
  495. MPMCQueue<int, DeterministicAtomic, Dynamic>(cap),
  496. "MPMCQueue<int, DeterministicAtomic, Dynamic>(" +
  497. folly::to<std::string>(cap) + ")",
  498. 10,
  499. 10,
  500. 1000,
  501. *caller);
  502. }
  503. }
  504. void runMtProdConsDeterministicDynamic(
  505. long seed,
  506. uint32_t prods,
  507. uint32_t cons,
  508. uint32_t numOps,
  509. size_t cap,
  510. size_t minCap,
  511. size_t mult) {
  512. // we use the Bench method, but perf results are meaningless under DSched
  513. DSched sched(DSched::uniform(seed));
  514. using QueueType = MPMCQueue<int, DeterministicAtomic, true>;
  515. vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  516. callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  517. callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  518. callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  519. callers.emplace_back(
  520. std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  521. callers.emplace_back(
  522. std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  523. for (const auto& caller : callers) {
  524. LOG(INFO) << producerConsumerBench(
  525. MPMCQueue<int, DeterministicAtomic, true>(cap, minCap, mult),
  526. "MPMCQueue<int, DeterministicAtomic, true>(" +
  527. folly::to<std::string>(cap) + ", " +
  528. folly::to<std::string>(minCap) + ", " +
  529. folly::to<std::string>(mult) + ")",
  530. prods,
  531. cons,
  532. numOps,
  533. *caller);
  534. }
  535. }
  536. TEST(MPMCQueue, mt_prod_cons_deterministic) {
  537. runMtProdConsDeterministic(0);
  538. }
  539. TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic) {
  540. runMtProdConsDeterministic<true>(0);
  541. }
  542. template <typename T>
  543. void setFromEnv(T& var, const char* envvar) {
  544. char* str = std::getenv(envvar);
  545. if (str) {
  546. var = atoi(str);
  547. }
  548. }
  549. TEST(MPMCQueue, mt_prod_cons_deterministic_dynamic_with_arguments) {
  550. long seed = 0;
  551. uint32_t prods = 10;
  552. uint32_t cons = 10;
  553. uint32_t numOps = 1000;
  554. size_t cap = 10000;
  555. size_t minCap = 9;
  556. size_t mult = 3;
  557. setFromEnv(seed, "SEED");
  558. setFromEnv(prods, "PRODS");
  559. setFromEnv(cons, "CONS");
  560. setFromEnv(numOps, "NUM_OPS");
  561. setFromEnv(cap, "CAP");
  562. setFromEnv(minCap, "MIN_CAP");
  563. setFromEnv(mult, "MULT");
  564. runMtProdConsDeterministicDynamic(
  565. seed, prods, cons, numOps, cap, minCap, mult);
  566. }
  567. #define PC_BENCH(q, np, nc, ...) \
  568. producerConsumerBench(q, #q, (np), (nc), __VA_ARGS__)
  569. template <bool Dynamic = false>
  570. void runMtProdCons() {
  571. using QueueType = MPMCQueue<int, std::atomic, Dynamic>;
  572. int n = 100000;
  573. setFromEnv(n, "NUM_OPS");
  574. vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  575. callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  576. callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  577. callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  578. callers.emplace_back(
  579. std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  580. callers.emplace_back(
  581. std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  582. for (const auto& caller : callers) {
  583. LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
  584. LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
  585. LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
  586. LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
  587. LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
  588. LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
  589. LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
  590. LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
  591. LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
  592. }
  593. }
  594. TEST(MPMCQueue, mt_prod_cons) {
  595. runMtProdCons();
  596. }
  597. TEST(MPMCQueue, mt_prod_cons_dynamic) {
  598. runMtProdCons</* Dynamic = */ true>();
  599. }
  600. template <bool Dynamic = false>
  601. void runMtProdConsEmulatedFutex() {
  602. using QueueType = MPMCQueue<int, EmulatedFutexAtomic, Dynamic>;
  603. int n = 100000;
  604. vector<unique_ptr<WriteMethodCaller<QueueType>>> callers;
  605. callers.emplace_back(std::make_unique<BlockingWriteCaller<QueueType>>());
  606. callers.emplace_back(std::make_unique<WriteIfNotFullCaller<QueueType>>());
  607. callers.emplace_back(std::make_unique<WriteCaller<QueueType>>());
  608. callers.emplace_back(
  609. std::make_unique<TryWriteUntilCaller<QueueType>>(milliseconds(1)));
  610. callers.emplace_back(
  611. std::make_unique<TryWriteUntilCaller<QueueType>>(seconds(2)));
  612. for (const auto& caller : callers) {
  613. LOG(INFO) << PC_BENCH((QueueType(10)), 1, 1, n, *caller);
  614. LOG(INFO) << PC_BENCH((QueueType(10)), 10, 1, n, *caller);
  615. LOG(INFO) << PC_BENCH((QueueType(10)), 1, 10, n, *caller);
  616. LOG(INFO) << PC_BENCH((QueueType(10)), 10, 10, n, *caller);
  617. LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 1, n, *caller);
  618. LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 1, n, *caller);
  619. LOG(INFO) << PC_BENCH((QueueType(10000)), 1, 10, n, *caller);
  620. LOG(INFO) << PC_BENCH((QueueType(10000)), 10, 10, n, *caller);
  621. LOG(INFO) << PC_BENCH((QueueType(100000)), 32, 100, n, *caller);
  622. }
  623. }
  624. TEST(MPMCQueue, mt_prod_cons_emulated_futex) {
  625. runMtProdConsEmulatedFutex();
  626. }
  627. TEST(MPMCQueue, mt_prod_cons_emulated_futex_dynamic) {
  628. runMtProdConsEmulatedFutex</* Dynamic = */ true>();
  629. }
  630. template <template <typename> class Atom, bool Dynamic = false>
  631. void runNeverFailThread(
  632. int numThreads,
  633. int n, /*numOps*/
  634. MPMCQueue<int, Atom, Dynamic>& cq,
  635. std::atomic<uint64_t>& sum,
  636. int t) {
  637. uint64_t threadSum = 0;
  638. for (int i = t; i < n; i += numThreads) {
  639. // enq + deq
  640. EXPECT_TRUE(cq.writeIfNotFull(i));
  641. int dest = -1;
  642. EXPECT_TRUE(cq.readIfNotEmpty(dest));
  643. EXPECT_TRUE(dest >= 0);
  644. threadSum += dest;
  645. }
  646. sum += threadSum;
  647. }
  648. template <template <typename> class Atom, bool Dynamic = false>
  649. uint64_t runNeverFailTest(int numThreads, int numOps) {
  650. // always #enq >= #deq
  651. MPMCQueue<int, Atom, Dynamic> cq(numThreads);
  652. uint64_t n = numOps;
  653. auto beginMicro = nowMicro();
  654. vector<std::thread> threads(numThreads);
  655. std::atomic<uint64_t> sum(0);
  656. for (int t = 0; t < numThreads; ++t) {
  657. threads[t] = DSched::thread(std::bind(
  658. runNeverFailThread<Atom, Dynamic>,
  659. numThreads,
  660. n,
  661. std::ref(cq),
  662. std::ref(sum),
  663. t));
  664. }
  665. for (auto& t : threads) {
  666. DSched::join(t);
  667. }
  668. EXPECT_TRUE(cq.isEmpty());
  669. EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
  670. return nowMicro() - beginMicro;
  671. }
  672. template <template <typename> class Atom, bool Dynamic = false>
  673. void runMtNeverFail(std::vector<int>& nts, int n) {
  674. for (int nt : nts) {
  675. uint64_t elapsed = runNeverFailTest<Atom, Dynamic>(nt, n);
  676. LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
  677. << " threads";
  678. }
  679. }
  680. // All the never_fail tests are for the non-dynamic version only.
  681. // False positive for dynamic version. Some writeIfNotFull() and
  682. // tryWriteUntil() operations may fail in transient conditions related
  683. // to expansion.
  684. TEST(MPMCQueue, mt_never_fail) {
  685. std::vector<int> nts{1, 3, 100};
  686. int n = 100000;
  687. runMtNeverFail<std::atomic>(nts, n);
  688. }
  689. TEST(MPMCQueue, mt_never_fail_emulated_futex) {
  690. std::vector<int> nts{1, 3, 100};
  691. int n = 100000;
  692. runMtNeverFail<EmulatedFutexAtomic>(nts, n);
  693. }
  694. template <bool Dynamic = false>
  695. void runMtNeverFailDeterministic(std::vector<int>& nts, int n, long seed) {
  696. LOG(INFO) << "using seed " << seed;
  697. for (int nt : nts) {
  698. {
  699. DSched sched(DSched::uniform(seed));
  700. runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
  701. }
  702. {
  703. DSched sched(DSched::uniformSubset(seed, 2));
  704. runNeverFailTest<DeterministicAtomic, Dynamic>(nt, n);
  705. }
  706. }
  707. }
  708. TEST(MPMCQueue, mt_never_fail_deterministic) {
  709. std::vector<int> nts{3, 10};
  710. long seed = 0; // nowMicro() % 10000;
  711. int n = 1000;
  712. runMtNeverFailDeterministic(nts, n, seed);
  713. }
  714. template <class Clock, template <typename> class Atom, bool Dynamic>
  715. void runNeverFailUntilThread(
  716. int numThreads,
  717. int n, /*numOps*/
  718. MPMCQueue<int, Atom, Dynamic>& cq,
  719. std::atomic<uint64_t>& sum,
  720. int t) {
  721. uint64_t threadSum = 0;
  722. for (int i = t; i < n; i += numThreads) {
  723. // enq + deq
  724. auto soon = Clock::now() + std::chrono::seconds(1);
  725. EXPECT_TRUE(cq.tryWriteUntil(soon, i));
  726. int dest = -1;
  727. EXPECT_TRUE(cq.readIfNotEmpty(dest));
  728. EXPECT_TRUE(dest >= 0);
  729. threadSum += dest;
  730. }
  731. sum += threadSum;
  732. }
  733. template <class Clock, template <typename> class Atom, bool Dynamic = false>
  734. uint64_t runNeverFailTest(int numThreads, int numOps) {
  735. // always #enq >= #deq
  736. MPMCQueue<int, Atom, Dynamic> cq(numThreads);
  737. uint64_t n = numOps;
  738. auto beginMicro = nowMicro();
  739. vector<std::thread> threads(numThreads);
  740. std::atomic<uint64_t> sum(0);
  741. for (int t = 0; t < numThreads; ++t) {
  742. threads[t] = DSched::thread(std::bind(
  743. runNeverFailUntilThread<Clock, Atom, Dynamic>,
  744. numThreads,
  745. n,
  746. std::ref(cq),
  747. std::ref(sum),
  748. t));
  749. }
  750. for (auto& t : threads) {
  751. DSched::join(t);
  752. }
  753. EXPECT_TRUE(cq.isEmpty());
  754. EXPECT_EQ(n * (n - 1) / 2 - sum, 0);
  755. return nowMicro() - beginMicro;
  756. }
  757. template <bool Dynamic = false>
  758. void runMtNeverFailUntilSystem(std::vector<int>& nts, int n) {
  759. for (int nt : nts) {
  760. uint64_t elapsed =
  761. runNeverFailTest<std::chrono::system_clock, std::atomic, Dynamic>(
  762. nt, n);
  763. LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
  764. << " threads";
  765. }
  766. }
  767. TEST(MPMCQueue, mt_never_fail_until_system) {
  768. std::vector<int> nts{1, 3, 100};
  769. int n = 100000;
  770. runMtNeverFailUntilSystem(nts, n);
  771. }
  772. template <bool Dynamic = false>
  773. void runMtNeverFailUntilSteady(std::vector<int>& nts, int n) {
  774. for (int nt : nts) {
  775. uint64_t elapsed =
  776. runNeverFailTest<std::chrono::steady_clock, std::atomic, Dynamic>(
  777. nt, n);
  778. LOG(INFO) << (elapsed * 1000.0) / (n * 2) << " nanos per op with " << nt
  779. << " threads";
  780. }
  781. }
  782. TEST(MPMCQueue, mt_never_fail_until_steady) {
  783. std::vector<int> nts{1, 3, 100};
  784. int n = 100000;
  785. runMtNeverFailUntilSteady(nts, n);
  786. }
  787. enum LifecycleEvent {
  788. NOTHING = -1,
  789. DEFAULT_CONSTRUCTOR,
  790. COPY_CONSTRUCTOR,
  791. MOVE_CONSTRUCTOR,
  792. TWO_ARG_CONSTRUCTOR,
  793. COPY_OPERATOR,
  794. MOVE_OPERATOR,
  795. DESTRUCTOR,
  796. MAX_LIFECYCLE_EVENT
  797. };
  798. static FOLLY_TLS int lc_counts[MAX_LIFECYCLE_EVENT];
  799. static FOLLY_TLS int lc_prev[MAX_LIFECYCLE_EVENT];
  800. static int lc_outstanding() {
  801. return lc_counts[DEFAULT_CONSTRUCTOR] + lc_counts[COPY_CONSTRUCTOR] +
  802. lc_counts[MOVE_CONSTRUCTOR] + lc_counts[TWO_ARG_CONSTRUCTOR] -
  803. lc_counts[DESTRUCTOR];
  804. }
  805. static void lc_snap() {
  806. for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
  807. lc_prev[i] = lc_counts[i];
  808. }
  809. }
  810. #define LIFECYCLE_STEP(...) lc_step(__LINE__, __VA_ARGS__)
  811. static void lc_step(int lineno, int what = NOTHING, int what2 = NOTHING) {
  812. for (int i = 0; i < MAX_LIFECYCLE_EVENT; ++i) {
  813. int delta = i == what || i == what2 ? 1 : 0;
  814. EXPECT_EQ(lc_counts[i] - lc_prev[i], delta)
  815. << "lc_counts[" << i << "] - lc_prev[" << i << "] was "
  816. << (lc_counts[i] - lc_prev[i]) << ", expected " << delta
  817. << ", from line " << lineno;
  818. }
  819. lc_snap();
  820. }
  821. template <typename R>
  822. struct Lifecycle {
  823. typedef R IsRelocatable;
  824. bool constructed;
  825. Lifecycle() noexcept : constructed(true) {
  826. ++lc_counts[DEFAULT_CONSTRUCTOR];
  827. }
  828. explicit Lifecycle(int /* n */, char const* /* s */) noexcept
  829. : constructed(true) {
  830. ++lc_counts[TWO_ARG_CONSTRUCTOR];
  831. }
  832. Lifecycle(const Lifecycle& /* rhs */) noexcept : constructed(true) {
  833. ++lc_counts[COPY_CONSTRUCTOR];
  834. }
  835. Lifecycle(Lifecycle&& /* rhs */) noexcept : constructed(true) {
  836. ++lc_counts[MOVE_CONSTRUCTOR];
  837. }
  838. Lifecycle& operator=(const Lifecycle& /* rhs */) noexcept {
  839. ++lc_counts[COPY_OPERATOR];
  840. return *this;
  841. }
  842. Lifecycle& operator=(Lifecycle&& /* rhs */) noexcept {
  843. ++lc_counts[MOVE_OPERATOR];
  844. return *this;
  845. }
  846. ~Lifecycle() noexcept {
  847. ++lc_counts[DESTRUCTOR];
  848. assert(lc_outstanding() >= 0);
  849. assert(constructed);
  850. constructed = false;
  851. }
  852. };
  853. template <typename R>
  854. void runPerfectForwardingTest() {
  855. lc_snap();
  856. EXPECT_EQ(lc_outstanding(), 0);
  857. {
  858. // Non-dynamic only. False positive for dynamic.
  859. MPMCQueue<Lifecycle<R>, std::atomic> queue(50);
  860. LIFECYCLE_STEP(NOTHING);
  861. for (int pass = 0; pass < 10; ++pass) {
  862. for (int i = 0; i < 10; ++i) {
  863. queue.blockingWrite();
  864. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  865. queue.blockingWrite(1, "one");
  866. LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
  867. {
  868. Lifecycle<R> src;
  869. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  870. queue.blockingWrite(std::move(src));
  871. LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
  872. }
  873. LIFECYCLE_STEP(DESTRUCTOR);
  874. {
  875. Lifecycle<R> src;
  876. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  877. queue.blockingWrite(src);
  878. LIFECYCLE_STEP(COPY_CONSTRUCTOR);
  879. }
  880. LIFECYCLE_STEP(DESTRUCTOR);
  881. EXPECT_TRUE(queue.write());
  882. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  883. }
  884. EXPECT_EQ(queue.size(), 50);
  885. EXPECT_FALSE(queue.write(2, "two"));
  886. LIFECYCLE_STEP(NOTHING);
  887. for (int i = 0; i < 50; ++i) {
  888. {
  889. Lifecycle<R> node;
  890. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  891. queue.blockingRead(node);
  892. if (R::value) {
  893. // relocatable, moved via memcpy
  894. LIFECYCLE_STEP(DESTRUCTOR);
  895. } else {
  896. LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
  897. }
  898. }
  899. LIFECYCLE_STEP(DESTRUCTOR);
  900. }
  901. EXPECT_EQ(queue.size(), 0);
  902. }
  903. // put one element back before destruction
  904. {
  905. Lifecycle<R> src(3, "three");
  906. LIFECYCLE_STEP(TWO_ARG_CONSTRUCTOR);
  907. queue.write(std::move(src));
  908. LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
  909. }
  910. LIFECYCLE_STEP(DESTRUCTOR); // destroy src
  911. }
  912. LIFECYCLE_STEP(DESTRUCTOR); // destroy queue
  913. EXPECT_EQ(lc_outstanding(), 0);
  914. }
  915. TEST(MPMCQueue, perfect_forwarding) {
  916. runPerfectForwardingTest<std::false_type>();
  917. }
  918. TEST(MPMCQueue, perfect_forwarding_relocatable) {
  919. runPerfectForwardingTest<std::true_type>();
  920. }
  921. template <bool Dynamic = false>
  922. void run_queue_moving() {
  923. lc_snap();
  924. EXPECT_EQ(lc_outstanding(), 0);
  925. {
  926. MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> a(50);
  927. LIFECYCLE_STEP(NOTHING);
  928. a.blockingWrite();
  929. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  930. // move constructor
  931. MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> b =
  932. std::move(a);
  933. LIFECYCLE_STEP(NOTHING);
  934. EXPECT_EQ(a.capacity(), 0);
  935. EXPECT_EQ(a.size(), 0);
  936. EXPECT_EQ(b.capacity(), 50);
  937. EXPECT_EQ(b.size(), 1);
  938. b.blockingWrite();
  939. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  940. // move operator
  941. MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> c;
  942. LIFECYCLE_STEP(NOTHING);
  943. c = std::move(b);
  944. LIFECYCLE_STEP(NOTHING);
  945. EXPECT_EQ(c.capacity(), 50);
  946. EXPECT_EQ(c.size(), 2);
  947. {
  948. Lifecycle<std::false_type> dst;
  949. LIFECYCLE_STEP(DEFAULT_CONSTRUCTOR);
  950. c.blockingRead(dst);
  951. LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
  952. {
  953. // swap
  954. MPMCQueue<Lifecycle<std::false_type>, std::atomic, Dynamic> d(10);
  955. LIFECYCLE_STEP(NOTHING);
  956. std::swap(c, d);
  957. LIFECYCLE_STEP(NOTHING);
  958. EXPECT_EQ(c.capacity(), 10);
  959. EXPECT_TRUE(c.isEmpty());
  960. EXPECT_EQ(d.capacity(), 50);
  961. EXPECT_EQ(d.size(), 1);
  962. d.blockingRead(dst);
  963. LIFECYCLE_STEP(DESTRUCTOR, MOVE_OPERATOR);
  964. c.blockingWrite(dst);
  965. LIFECYCLE_STEP(COPY_CONSTRUCTOR);
  966. d.blockingWrite(std::move(dst));
  967. LIFECYCLE_STEP(MOVE_CONSTRUCTOR);
  968. } // d goes out of scope
  969. LIFECYCLE_STEP(DESTRUCTOR);
  970. } // dst goes out of scope
  971. LIFECYCLE_STEP(DESTRUCTOR);
  972. } // c goes out of scope
  973. LIFECYCLE_STEP(DESTRUCTOR);
  974. }
  975. TEST(MPMCQueue, queue_moving) {
  976. run_queue_moving();
  977. }
  978. TEST(MPMCQueue, queue_moving_dynamic) {
  979. run_queue_moving<true>();
  980. }
  981. TEST(MPMCQueue, explicit_zero_capacity_fail) {
  982. ASSERT_THROW(MPMCQueue<int> cq(0), std::invalid_argument);
  983. using DynamicMPMCQueueInt = MPMCQueue<int, std::atomic, true>;
  984. ASSERT_THROW(DynamicMPMCQueueInt cq(0), std::invalid_argument);
  985. }
  986. template <bool Dynamic>
  987. void testTryReadUntil() {
  988. MPMCQueue<int, std::atomic, Dynamic> q{1};
  989. const auto wait = std::chrono::milliseconds(100);
  990. stop_watch<> watch;
  991. bool rets[2];
  992. int vals[2];
  993. std::vector<std::thread> threads;
  994. boost::barrier b{3};
  995. for (int i = 0; i < 2; i++) {
  996. threads.emplace_back([&, i] {
  997. b.wait();
  998. rets[i] = q.tryReadUntil(watch.getCheckpoint() + wait, vals[i]);
  999. });
  1000. }
  1001. b.wait();
  1002. EXPECT_TRUE(q.write(42));
  1003. for (int i = 0; i < 2; i++) {
  1004. threads[i].join();
  1005. }
  1006. for (int i = 0; i < 2; i++) {
  1007. int other = (i + 1) % 2;
  1008. if (rets[i]) {
  1009. EXPECT_EQ(42, vals[i]);
  1010. EXPECT_FALSE(rets[other]);
  1011. }
  1012. }
  1013. EXPECT_TRUE(watch.elapsed(wait));
  1014. }
  1015. template <bool Dynamic>
  1016. void testTryWriteUntil() {
  1017. MPMCQueue<int, std::atomic, Dynamic> q{1};
  1018. EXPECT_TRUE(q.write(42));
  1019. const auto wait = std::chrono::milliseconds(100);
  1020. stop_watch<> watch;
  1021. bool rets[2];
  1022. std::vector<std::thread> threads;
  1023. boost::barrier b{3};
  1024. for (int i = 0; i < 2; i++) {
  1025. threads.emplace_back([&, i] {
  1026. b.wait();
  1027. rets[i] = q.tryWriteUntil(watch.getCheckpoint() + wait, i);
  1028. });
  1029. }
  1030. b.wait();
  1031. int x;
  1032. EXPECT_TRUE(q.read(x));
  1033. EXPECT_EQ(42, x);
  1034. for (int i = 0; i < 2; i++) {
  1035. threads[i].join();
  1036. }
  1037. EXPECT_TRUE(q.read(x));
  1038. for (int i = 0; i < 2; i++) {
  1039. int other = (i + 1) % 2;
  1040. if (rets[i]) {
  1041. EXPECT_EQ(i, x);
  1042. EXPECT_FALSE(rets[other]);
  1043. }
  1044. }
  1045. EXPECT_TRUE(watch.elapsed(wait));
  1046. }
  1047. TEST(MPMCQueue, try_read_until) {
  1048. testTryReadUntil<false>();
  1049. }
  1050. TEST(MPMCQueue, try_read_until_dynamic) {
  1051. testTryReadUntil<true>();
  1052. }
  1053. TEST(MPMCQueue, try_write_until) {
  1054. testTryWriteUntil<false>();
  1055. }
  1056. TEST(MPMCQueue, try_write_until_dynamic) {
  1057. testTryWriteUntil<true>();
  1058. }
  1059. template <bool Dynamic>
  1060. void testTimeout(MPMCQueue<int, std::atomic, Dynamic>& q) {
  1061. CHECK(q.write(1));
  1062. /* The following must not block forever */
  1063. q.tryWriteUntil(
  1064. std::chrono::system_clock::now() + std::chrono::microseconds(10000), 2);
  1065. }
  1066. TEST(MPMCQueue, try_write_until_timeout) {
  1067. folly::MPMCQueue<int, std::atomic, false> queue(1);
  1068. testTimeout<false>(queue);
  1069. }
  1070. TEST(MPMCQueue, must_fail_try_write_until_dynamic) {
  1071. folly::MPMCQueue<int, std::atomic, true> queue(200, 1, 2);
  1072. testTimeout<true>(queue);
  1073. }