NotificationQueueTest.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. /*
  2. * Copyright 2015-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/async/NotificationQueue.h>
  17. #include <sys/types.h>
  18. #include <iostream>
  19. #include <list>
  20. #include <thread>
  21. #include <folly/io/async/ScopedEventBaseThread.h>
  22. #include <folly/portability/GTest.h>
  23. #include <folly/synchronization/Baton.h>
  24. #ifndef _WIN32
  25. #include <sys/wait.h>
  26. #endif
  27. using namespace std;
  28. using namespace folly;
  29. typedef NotificationQueue<int> IntQueue;
  30. class QueueConsumer : public IntQueue::Consumer {
  31. public:
  32. QueueConsumer() {}
  33. void messageAvailable(int&& value) noexcept override {
  34. messages.push_back(value);
  35. if (fn) {
  36. fn(value);
  37. }
  38. }
  39. std::function<void(int)> fn;
  40. std::deque<int> messages;
  41. };
  42. class QueueTest {
  43. public:
  44. explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
  45. : queue(maxSize, type), terminationQueue(maxSize, type) {}
  46. void sendOne();
  47. void putMessages();
  48. void multiConsumer();
  49. void maxQueueSize();
  50. void maxReadAtOnce();
  51. void destroyCallback();
  52. void useAfterFork();
  53. IntQueue queue;
  54. IntQueue terminationQueue;
  55. };
  56. void QueueTest::sendOne() {
  57. // Create a notification queue and a callback in this thread
  58. EventBase eventBase;
  59. QueueConsumer consumer;
  60. consumer.fn = [&](int) {
  61. // Stop consuming after we receive 1 message
  62. consumer.stopConsuming();
  63. };
  64. consumer.startConsuming(&eventBase, &queue);
  65. // Start a new EventBase thread to put a message on our queue
  66. ScopedEventBaseThread t1;
  67. t1.getEventBase()->runInEventBaseThread([&] { this->queue.putMessage(5); });
  68. // Loop until we receive the message
  69. eventBase.loop();
  70. const auto& messages = consumer.messages;
  71. EXPECT_EQ(1, messages.size());
  72. EXPECT_EQ(5, messages.at(0));
  73. }
  74. void QueueTest::putMessages() {
  75. EventBase eventBase;
  76. QueueConsumer consumer;
  77. QueueConsumer consumer2;
  78. consumer.fn = [&](int msg) {
  79. // Stop consuming after we receive a message with value 0, and start
  80. // consumer2
  81. if (msg == 0) {
  82. consumer.stopConsuming();
  83. consumer2.startConsuming(&eventBase, &queue);
  84. }
  85. };
  86. consumer2.fn = [&](int msg) {
  87. // Stop consuming after we receive a message with value 0
  88. if (msg == 0) {
  89. consumer2.stopConsuming();
  90. }
  91. };
  92. consumer.startConsuming(&eventBase, &queue);
  93. list<int> msgList = {1, 2, 3, 4};
  94. vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0};
  95. // Call putMessages() several times to add messages to the queue
  96. queue.putMessages(msgList.begin(), msgList.end());
  97. queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
  98. // Test sending 17 messages, the pipe-based queue calls write in 16 byte
  99. // chunks
  100. queue.putMessages(msgVector.begin(), msgVector.end());
  101. // Loop until the consumer has stopped
  102. eventBase.loop();
  103. vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0};
  104. vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0};
  105. EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
  106. for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
  107. EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
  108. }
  109. EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
  110. for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
  111. EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
  112. }
  113. }
  114. void QueueTest::multiConsumer() {
  115. uint32_t numConsumers = 8;
  116. uint32_t numMessages = 10000;
  117. // Create several consumers each running in their own EventBase thread
  118. vector<QueueConsumer> consumers(numConsumers);
  119. vector<ScopedEventBaseThread> threads(numConsumers);
  120. for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
  121. QueueConsumer* consumer = &consumers[consumerIdx];
  122. consumer->fn = [consumer, consumerIdx, this](int value) {
  123. // Treat 0 as a signal to stop.
  124. if (value == 0) {
  125. consumer->stopConsuming();
  126. // Put a message on the terminationQueue to indicate we have stopped
  127. terminationQueue.putMessage(consumerIdx);
  128. }
  129. };
  130. EventBase* eventBase = threads[consumerIdx].getEventBase();
  131. eventBase->runInEventBaseThread([eventBase, consumer, this] {
  132. consumer->startConsuming(eventBase, &queue);
  133. });
  134. }
  135. // Now add a number of messages from this thread
  136. // Start at 1 rather than 0, since 0 is the signal to stop.
  137. for (uint32_t n = 1; n < numMessages; ++n) {
  138. queue.putMessage(n);
  139. }
  140. // Now add a 0 for each consumer, to signal them to stop
  141. for (uint32_t n = 0; n < numConsumers; ++n) {
  142. queue.putMessage(0);
  143. }
  144. // Wait until we get notified that all of the consumers have stopped
  145. // We use a separate notification queue for this.
  146. QueueConsumer terminationConsumer;
  147. vector<uint32_t> consumersStopped(numConsumers, 0);
  148. uint32_t consumersRemaining = numConsumers;
  149. terminationConsumer.fn = [&](int consumerIdx) {
  150. --consumersRemaining;
  151. if (consumersRemaining == 0) {
  152. terminationConsumer.stopConsuming();
  153. }
  154. EXPECT_GE(consumerIdx, 0);
  155. EXPECT_LT(consumerIdx, numConsumers);
  156. ++consumersStopped[consumerIdx];
  157. };
  158. EventBase eventBase;
  159. terminationConsumer.startConsuming(&eventBase, &terminationQueue);
  160. eventBase.loop();
  161. // Verify that we saw exactly 1 stop message for each consumer
  162. for (uint32_t n = 0; n < numConsumers; ++n) {
  163. EXPECT_EQ(1, consumersStopped[n]);
  164. }
  165. // Validate that every message sent to the main queue was received exactly
  166. // once.
  167. vector<int> messageCount(numMessages, 0);
  168. for (uint32_t n = 0; n < numConsumers; ++n) {
  169. for (int msg : consumers[n].messages) {
  170. EXPECT_GE(msg, 0);
  171. EXPECT_LT(msg, numMessages);
  172. ++messageCount[msg];
  173. }
  174. }
  175. // 0 is the signal to stop, and should have been received once by each
  176. // consumer
  177. EXPECT_EQ(numConsumers, messageCount[0]);
  178. // All other messages should have been received exactly once
  179. for (uint32_t n = 1; n < numMessages; ++n) {
  180. EXPECT_EQ(1, messageCount[n]);
  181. }
  182. }
  183. void QueueTest::maxQueueSize() {
  184. // Create a queue with a maximum size of 5, and fill it up
  185. for (int n = 0; n < 5; ++n) {
  186. queue.tryPutMessage(n);
  187. }
  188. // Calling tryPutMessage() now should fail
  189. EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
  190. EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
  191. int val = 5;
  192. EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
  193. // Pop a message from the queue
  194. int result = -1;
  195. EXPECT_TRUE(queue.tryConsume(result));
  196. EXPECT_EQ(0, result);
  197. // We should be able to write another message now that we popped one off.
  198. queue.tryPutMessage(5);
  199. // But now we are full again.
  200. EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
  201. // putMessage() should let us exceed the maximum
  202. queue.putMessage(6);
  203. // Pull another mesage off
  204. EXPECT_TRUE(queue.tryConsume(result));
  205. EXPECT_EQ(1, result);
  206. // tryPutMessage() should still fail since putMessage() actually put us over
  207. // the max.
  208. EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
  209. // Pull another message off and try again
  210. EXPECT_TRUE(queue.tryConsume(result));
  211. EXPECT_EQ(2, result);
  212. queue.tryPutMessage(7);
  213. // Now pull all the remaining messages off
  214. EXPECT_TRUE(queue.tryConsume(result));
  215. EXPECT_EQ(3, result);
  216. EXPECT_TRUE(queue.tryConsume(result));
  217. EXPECT_EQ(4, result);
  218. EXPECT_TRUE(queue.tryConsume(result));
  219. EXPECT_EQ(5, result);
  220. EXPECT_TRUE(queue.tryConsume(result));
  221. EXPECT_EQ(6, result);
  222. EXPECT_TRUE(queue.tryConsume(result));
  223. EXPECT_EQ(7, result);
  224. // There should be no messages left
  225. result = -1;
  226. EXPECT_TRUE(!queue.tryConsume(result));
  227. EXPECT_EQ(-1, result);
  228. }
  229. void QueueTest::maxReadAtOnce() {
  230. // Add 100 messages to the queue
  231. for (int n = 0; n < 100; ++n) {
  232. queue.putMessage(n);
  233. }
  234. EventBase eventBase;
  235. // Record how many messages were processed each loop iteration.
  236. uint32_t messagesThisLoop = 0;
  237. std::vector<uint32_t> messagesPerLoop;
  238. std::function<void()> loopFinished = [&] {
  239. // Record the current number of messages read this loop
  240. messagesPerLoop.push_back(messagesThisLoop);
  241. // Reset messagesThisLoop to 0 for the next loop
  242. messagesThisLoop = 0;
  243. // To prevent use-after-free bugs when eventBase destructs,
  244. // prevent calling runInLoop any more after the test is finished.
  245. // 55 == number of times loop should run.
  246. if (messagesPerLoop.size() != 55) {
  247. // Reschedule ourself to run at the end of the next loop
  248. eventBase.runInLoop(loopFinished);
  249. }
  250. };
  251. // Schedule the first call to loopFinished
  252. eventBase.runInLoop(loopFinished);
  253. QueueConsumer consumer;
  254. // Read the first 50 messages 10 at a time.
  255. consumer.setMaxReadAtOnce(10);
  256. consumer.fn = [&](int value) {
  257. ++messagesThisLoop;
  258. // After 50 messages, drop to reading only 1 message at a time.
  259. if (value == 50) {
  260. consumer.setMaxReadAtOnce(1);
  261. }
  262. // Terminate the loop when we reach the end of the messages.
  263. if (value == 99) {
  264. eventBase.terminateLoopSoon();
  265. }
  266. };
  267. consumer.startConsuming(&eventBase, &queue);
  268. // Run the event loop until the consumer terminates it
  269. eventBase.loop();
  270. // The consumer should have read all 100 messages in order
  271. EXPECT_EQ(100, consumer.messages.size());
  272. for (int n = 0; n < 100; ++n) {
  273. EXPECT_EQ(n, consumer.messages.at(n));
  274. }
  275. // Currently EventBase happens to still run the loop callbacks even after
  276. // terminateLoopSoon() is called. However, we don't really want to depend on
  277. // this behavior. In case this ever changes in the future, add
  278. // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
  279. // last loop iteration.
  280. if (messagesThisLoop > 0) {
  281. messagesPerLoop.push_back(messagesThisLoop);
  282. messagesThisLoop = 0;
  283. }
  284. // For the first 5 loops it should have read 10 messages each time.
  285. // After that it should have read 1 messages per loop for the next 50 loops.
  286. EXPECT_EQ(55, messagesPerLoop.size());
  287. for (int n = 0; n < 5; ++n) {
  288. EXPECT_EQ(10, messagesPerLoop.at(n));
  289. }
  290. for (int n = 5; n < 55; ++n) {
  291. EXPECT_EQ(1, messagesPerLoop.at(n));
  292. }
  293. }
  294. void QueueTest::destroyCallback() {
  295. // Rather than using QueueConsumer, define a separate class for the destroy
  296. // test. The DestroyTestConsumer will delete itself inside the
  297. // messageAvailable() callback. With a regular QueueConsumer this would
  298. // destroy the std::function object while the function is running, which we
  299. // should probably avoid doing. This uses a pointer to a std::function to
  300. // avoid destroying the function object.
  301. class DestroyTestConsumer : public IntQueue::Consumer {
  302. public:
  303. void messageAvailable(int&& value) noexcept override {
  304. DestructorGuard g(this);
  305. if (fn && *fn) {
  306. (*fn)(value);
  307. }
  308. }
  309. std::function<void(int)>* fn;
  310. protected:
  311. ~DestroyTestConsumer() override = default;
  312. };
  313. EventBase eventBase;
  314. // Create a queue and add 2 messages to it
  315. queue.putMessage(1);
  316. queue.putMessage(2);
  317. // Create two QueueConsumers allocated on the heap.
  318. // Have whichever one gets called first destroy both of the QueueConsumers.
  319. // This way one consumer will be destroyed from inside its messageAvailable()
  320. // callback, and one consume will be destroyed when it isn't inside
  321. // messageAvailable().
  322. std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
  323. consumer1(new DestroyTestConsumer);
  324. std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
  325. consumer2(new DestroyTestConsumer);
  326. std::function<void(int)> fn = [&](int) {
  327. consumer1 = nullptr;
  328. consumer2 = nullptr;
  329. };
  330. consumer1->fn = &fn;
  331. consumer2->fn = &fn;
  332. consumer1->startConsuming(&eventBase, &queue);
  333. consumer2->startConsuming(&eventBase, &queue);
  334. // Run the event loop.
  335. eventBase.loop();
  336. // One of the consumers should have fired, received the message,
  337. // then destroyed both consumers.
  338. EXPECT_TRUE(!consumer1);
  339. EXPECT_TRUE(!consumer2);
  340. // One message should be left in the queue
  341. int result = 1;
  342. EXPECT_TRUE(queue.tryConsume(result));
  343. EXPECT_EQ(2, result);
  344. }
  345. TEST(NotificationQueueTest, ConsumeUntilDrained) {
  346. // Basic tests: make sure we
  347. // - drain all the messages
  348. // - ignore any maxReadAtOnce
  349. // - can't add messages during draining
  350. EventBase eventBase;
  351. IntQueue queue;
  352. QueueConsumer consumer;
  353. consumer.fn = [&](int i) {
  354. EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
  355. EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
  356. EXPECT_THROW(queue.putMessage(i), std::runtime_error);
  357. std::vector<int> ints{1, 2, 3};
  358. EXPECT_THROW(
  359. queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
  360. };
  361. consumer.setMaxReadAtOnce(10); // We should ignore this
  362. consumer.startConsuming(&eventBase, &queue);
  363. for (int i = 0; i < 20; i++) {
  364. queue.putMessage(i);
  365. }
  366. EXPECT_TRUE(consumer.consumeUntilDrained());
  367. EXPECT_EQ(20, consumer.messages.size());
  368. // Make sure there can only be one drainer at once
  369. folly::Baton<> callbackBaton, threadStartBaton;
  370. consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
  371. QueueConsumer competingConsumer;
  372. competingConsumer.startConsuming(&eventBase, &queue);
  373. queue.putMessage(1);
  374. atomic<bool> raceA{false};
  375. atomic<bool> raceB{false};
  376. size_t numConsA = 0;
  377. size_t numConsB = 0;
  378. auto thread = std::thread([&] {
  379. threadStartBaton.post();
  380. raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
  381. });
  382. threadStartBaton.wait();
  383. raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
  384. callbackBaton.post();
  385. thread.join();
  386. EXPECT_FALSE(raceA && raceB);
  387. EXPECT_TRUE(raceA || raceB);
  388. EXPECT_TRUE(raceA ^ raceB);
  389. }
  390. TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
  391. for (size_t i = 0; i < 1 << 8; ++i) {
  392. // Basic tests: make sure we
  393. // - drain all the messages
  394. // - ignore any maxReadAtOnce
  395. // - can't add messages during draining
  396. EventBase eventBase;
  397. IntQueue queue;
  398. QueueConsumer consumer;
  399. consumer.fn = [&](int j) {
  400. EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
  401. EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
  402. EXPECT_THROW(queue.putMessage(j), std::runtime_error);
  403. std::vector<int> ints{1, 2, 3};
  404. EXPECT_THROW(
  405. queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
  406. };
  407. consumer.setMaxReadAtOnce(10); // We should ignore this
  408. consumer.startConsuming(&eventBase, &queue);
  409. for (int j = 0; j < 20; j++) {
  410. queue.putMessage(j);
  411. }
  412. EXPECT_TRUE(consumer.consumeUntilDrained());
  413. EXPECT_EQ(20, consumer.messages.size());
  414. // Make sure there can only be one drainer at once
  415. folly::Baton<> callbackBaton, threadStartBaton;
  416. consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
  417. QueueConsumer competingConsumer;
  418. competingConsumer.startConsuming(&eventBase, &queue);
  419. queue.putMessage(1);
  420. atomic<bool> raceA{false};
  421. atomic<bool> raceB{false};
  422. size_t numConsA = 0;
  423. size_t numConsB = 0;
  424. auto thread = std::thread([&] {
  425. threadStartBaton.post();
  426. raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
  427. });
  428. threadStartBaton.wait();
  429. raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
  430. callbackBaton.post();
  431. thread.join();
  432. EXPECT_FALSE(raceA && raceB);
  433. EXPECT_TRUE(raceA || raceB);
  434. EXPECT_TRUE(raceA ^ raceB);
  435. }
  436. }
  437. #ifdef FOLLY_HAVE_EVENTFD
  438. TEST(NotificationQueueTest, SendOneEventFD) {
  439. QueueTest qt(0, IntQueue::FdType::EVENTFD);
  440. qt.sendOne();
  441. }
  442. TEST(NotificationQueueTest, PutMessagesEventFD) {
  443. QueueTest qt(0, IntQueue::FdType::EVENTFD);
  444. qt.sendOne();
  445. }
  446. TEST(NotificationQueueTest, MultiConsumerEventFD) {
  447. QueueTest qt(0, IntQueue::FdType::EVENTFD);
  448. qt.multiConsumer();
  449. }
  450. TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
  451. QueueTest qt(5, IntQueue::FdType::EVENTFD);
  452. qt.maxQueueSize();
  453. }
  454. TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
  455. QueueTest qt(0, IntQueue::FdType::EVENTFD);
  456. qt.maxReadAtOnce();
  457. }
  458. TEST(NotificationQueueTest, DestroyCallbackEventFD) {
  459. QueueTest qt(0, IntQueue::FdType::EVENTFD);
  460. qt.destroyCallback();
  461. }
  462. #endif
  463. TEST(NotificationQueueTest, SendOnePipe) {
  464. QueueTest qt(0, IntQueue::FdType::PIPE);
  465. qt.sendOne();
  466. }
  467. TEST(NotificationQueueTest, PutMessagesPipe) {
  468. QueueTest qt(0, IntQueue::FdType::PIPE);
  469. qt.sendOne();
  470. }
  471. TEST(NotificationQueueTest, MultiConsumerPipe) {
  472. QueueTest qt(0, IntQueue::FdType::PIPE);
  473. qt.multiConsumer();
  474. }
  475. TEST(NotificationQueueTest, MaxQueueSizePipe) {
  476. QueueTest qt(5, IntQueue::FdType::PIPE);
  477. qt.maxQueueSize();
  478. }
  479. TEST(NotificationQueueTest, MaxReadAtOncePipe) {
  480. QueueTest qt(0, IntQueue::FdType::PIPE);
  481. qt.maxReadAtOnce();
  482. }
  483. TEST(NotificationQueueTest, DestroyCallbackPipe) {
  484. QueueTest qt(0, IntQueue::FdType::PIPE);
  485. qt.destroyCallback();
  486. }
  487. #ifndef _WIN32
  488. /*
  489. * Test code that creates a NotificationQueue, then forks, and incorrectly
  490. * tries to send a message to the queue from the child process.
  491. *
  492. * The child process should crash in this scenario, since the child code has a
  493. * bug. (Older versions of NotificationQueue didn't catch this in the child,
  494. * resulting in a crash in the parent process.)
  495. */
  496. TEST(NotificationQueueTest, UseAfterFork) {
  497. IntQueue queue;
  498. int childStatus = 0;
  499. QueueConsumer consumer;
  500. // Boost sets a custom SIGCHLD handler, which fails the test if a child
  501. // process exits abnormally. We don't want this.
  502. signal(SIGCHLD, SIG_DFL);
  503. // Log some info so users reading the test output aren't confused
  504. // by the child process' crash log messages.
  505. LOG(INFO) << "This test makes sure the child process crashes. "
  506. << "Error log messagges and a backtrace are expected.";
  507. {
  508. // Start a separate thread consuming from the queue
  509. ScopedEventBaseThread t1;
  510. t1.getEventBase()->runInEventBaseThread(
  511. [&] { consumer.startConsuming(t1.getEventBase(), &queue); });
  512. // Send a message to it, just for sanity checking
  513. queue.putMessage(1234);
  514. // Fork
  515. pid_t pid = fork();
  516. if (pid == 0) {
  517. // The boost test framework installs signal handlers to catch errors.
  518. // We only want to catch in the parent. In the child let SIGABRT crash
  519. // us normally.
  520. signal(SIGABRT, SIG_DFL);
  521. // Child.
  522. // We're horrible people, so we try to send a message to the queue
  523. // that is being consumed in the parent process.
  524. //
  525. // The putMessage() call should catch this error, and crash our process.
  526. queue.putMessage(9876);
  527. // We shouldn't reach here.
  528. _exit(0);
  529. }
  530. PCHECK(pid > 0);
  531. // Parent. Wait for the child to exit.
  532. auto waited = waitpid(pid, &childStatus, 0);
  533. EXPECT_EQ(pid, waited);
  534. // Send another message to the queue before we terminate the thread.
  535. queue.putMessage(5678);
  536. }
  537. // The child process should have crashed when it tried to call putMessage()
  538. // on our NotificationQueue.
  539. EXPECT_TRUE(WIFSIGNALED(childStatus));
  540. EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
  541. // Make sure the parent saw the expected messages.
  542. // It should have gotten 1234 and 5678 from the parent process, but not
  543. // 9876 from the child.
  544. EXPECT_EQ(2, consumer.messages.size());
  545. EXPECT_EQ(1234, consumer.messages.front());
  546. consumer.messages.pop_front();
  547. EXPECT_EQ(5678, consumer.messages.front());
  548. consumer.messages.pop_front();
  549. }
  550. #endif
  551. TEST(NotificationQueueConsumer, make) {
  552. int value = 0;
  553. EventBase evb;
  554. NotificationQueue<int> queue(32);
  555. auto consumer =
  556. decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; });
  557. consumer->startConsuming(&evb, &queue);
  558. int const newValue = 10;
  559. queue.tryPutMessage(newValue);
  560. evb.loopOnce();
  561. EXPECT_EQ(newValue, value);
  562. }