123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- /*
- * Copyright 2015-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <folly/io/async/NotificationQueue.h>
- #include <sys/types.h>
- #include <iostream>
- #include <list>
- #include <thread>
- #include <folly/io/async/ScopedEventBaseThread.h>
- #include <folly/portability/GTest.h>
- #include <folly/synchronization/Baton.h>
- #ifndef _WIN32
- #include <sys/wait.h>
- #endif
- using namespace std;
- using namespace folly;
- typedef NotificationQueue<int> IntQueue;
- class QueueConsumer : public IntQueue::Consumer {
- public:
- QueueConsumer() {}
- void messageAvailable(int&& value) noexcept override {
- messages.push_back(value);
- if (fn) {
- fn(value);
- }
- }
- std::function<void(int)> fn;
- std::deque<int> messages;
- };
- class QueueTest {
- public:
- explicit QueueTest(uint32_t maxSize, IntQueue::FdType type)
- : queue(maxSize, type), terminationQueue(maxSize, type) {}
- void sendOne();
- void putMessages();
- void multiConsumer();
- void maxQueueSize();
- void maxReadAtOnce();
- void destroyCallback();
- void useAfterFork();
- IntQueue queue;
- IntQueue terminationQueue;
- };
- void QueueTest::sendOne() {
- // Create a notification queue and a callback in this thread
- EventBase eventBase;
- QueueConsumer consumer;
- consumer.fn = [&](int) {
- // Stop consuming after we receive 1 message
- consumer.stopConsuming();
- };
- consumer.startConsuming(&eventBase, &queue);
- // Start a new EventBase thread to put a message on our queue
- ScopedEventBaseThread t1;
- t1.getEventBase()->runInEventBaseThread([&] { this->queue.putMessage(5); });
- // Loop until we receive the message
- eventBase.loop();
- const auto& messages = consumer.messages;
- EXPECT_EQ(1, messages.size());
- EXPECT_EQ(5, messages.at(0));
- }
- void QueueTest::putMessages() {
- EventBase eventBase;
- QueueConsumer consumer;
- QueueConsumer consumer2;
- consumer.fn = [&](int msg) {
- // Stop consuming after we receive a message with value 0, and start
- // consumer2
- if (msg == 0) {
- consumer.stopConsuming();
- consumer2.startConsuming(&eventBase, &queue);
- }
- };
- consumer2.fn = [&](int msg) {
- // Stop consuming after we receive a message with value 0
- if (msg == 0) {
- consumer2.stopConsuming();
- }
- };
- consumer.startConsuming(&eventBase, &queue);
- list<int> msgList = {1, 2, 3, 4};
- vector<int> msgVector = {5, 0, 9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 6, 10, 2, 0};
- // Call putMessages() several times to add messages to the queue
- queue.putMessages(msgList.begin(), msgList.end());
- queue.putMessages(msgVector.begin() + 2, msgVector.begin() + 4);
- // Test sending 17 messages, the pipe-based queue calls write in 16 byte
- // chunks
- queue.putMessages(msgVector.begin(), msgVector.end());
- // Loop until the consumer has stopped
- eventBase.loop();
- vector<int> expectedMessages = {1, 2, 3, 4, 9, 8, 7, 5, 0};
- vector<int> expectedMessages2 = {9, 8, 7, 6, 7, 7, 8, 8, 2, 9, 6, 10, 2, 0};
- EXPECT_EQ(expectedMessages.size(), consumer.messages.size());
- for (unsigned int idx = 0; idx < expectedMessages.size(); ++idx) {
- EXPECT_EQ(expectedMessages[idx], consumer.messages.at(idx));
- }
- EXPECT_EQ(expectedMessages2.size(), consumer2.messages.size());
- for (unsigned int idx = 0; idx < expectedMessages2.size(); ++idx) {
- EXPECT_EQ(expectedMessages2[idx], consumer2.messages.at(idx));
- }
- }
- void QueueTest::multiConsumer() {
- uint32_t numConsumers = 8;
- uint32_t numMessages = 10000;
- // Create several consumers each running in their own EventBase thread
- vector<QueueConsumer> consumers(numConsumers);
- vector<ScopedEventBaseThread> threads(numConsumers);
- for (uint32_t consumerIdx = 0; consumerIdx < numConsumers; ++consumerIdx) {
- QueueConsumer* consumer = &consumers[consumerIdx];
- consumer->fn = [consumer, consumerIdx, this](int value) {
- // Treat 0 as a signal to stop.
- if (value == 0) {
- consumer->stopConsuming();
- // Put a message on the terminationQueue to indicate we have stopped
- terminationQueue.putMessage(consumerIdx);
- }
- };
- EventBase* eventBase = threads[consumerIdx].getEventBase();
- eventBase->runInEventBaseThread([eventBase, consumer, this] {
- consumer->startConsuming(eventBase, &queue);
- });
- }
- // Now add a number of messages from this thread
- // Start at 1 rather than 0, since 0 is the signal to stop.
- for (uint32_t n = 1; n < numMessages; ++n) {
- queue.putMessage(n);
- }
- // Now add a 0 for each consumer, to signal them to stop
- for (uint32_t n = 0; n < numConsumers; ++n) {
- queue.putMessage(0);
- }
- // Wait until we get notified that all of the consumers have stopped
- // We use a separate notification queue for this.
- QueueConsumer terminationConsumer;
- vector<uint32_t> consumersStopped(numConsumers, 0);
- uint32_t consumersRemaining = numConsumers;
- terminationConsumer.fn = [&](int consumerIdx) {
- --consumersRemaining;
- if (consumersRemaining == 0) {
- terminationConsumer.stopConsuming();
- }
- EXPECT_GE(consumerIdx, 0);
- EXPECT_LT(consumerIdx, numConsumers);
- ++consumersStopped[consumerIdx];
- };
- EventBase eventBase;
- terminationConsumer.startConsuming(&eventBase, &terminationQueue);
- eventBase.loop();
- // Verify that we saw exactly 1 stop message for each consumer
- for (uint32_t n = 0; n < numConsumers; ++n) {
- EXPECT_EQ(1, consumersStopped[n]);
- }
- // Validate that every message sent to the main queue was received exactly
- // once.
- vector<int> messageCount(numMessages, 0);
- for (uint32_t n = 0; n < numConsumers; ++n) {
- for (int msg : consumers[n].messages) {
- EXPECT_GE(msg, 0);
- EXPECT_LT(msg, numMessages);
- ++messageCount[msg];
- }
- }
- // 0 is the signal to stop, and should have been received once by each
- // consumer
- EXPECT_EQ(numConsumers, messageCount[0]);
- // All other messages should have been received exactly once
- for (uint32_t n = 1; n < numMessages; ++n) {
- EXPECT_EQ(1, messageCount[n]);
- }
- }
- void QueueTest::maxQueueSize() {
- // Create a queue with a maximum size of 5, and fill it up
- for (int n = 0; n < 5; ++n) {
- queue.tryPutMessage(n);
- }
- // Calling tryPutMessage() now should fail
- EXPECT_THROW(queue.tryPutMessage(5), std::overflow_error);
- EXPECT_FALSE(queue.tryPutMessageNoThrow(5));
- int val = 5;
- EXPECT_FALSE(queue.tryPutMessageNoThrow(std::move(val)));
- // Pop a message from the queue
- int result = -1;
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(0, result);
- // We should be able to write another message now that we popped one off.
- queue.tryPutMessage(5);
- // But now we are full again.
- EXPECT_THROW(queue.tryPutMessage(6), std::overflow_error);
- // putMessage() should let us exceed the maximum
- queue.putMessage(6);
- // Pull another mesage off
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(1, result);
- // tryPutMessage() should still fail since putMessage() actually put us over
- // the max.
- EXPECT_THROW(queue.tryPutMessage(7), std::overflow_error);
- // Pull another message off and try again
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(2, result);
- queue.tryPutMessage(7);
- // Now pull all the remaining messages off
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(3, result);
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(4, result);
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(5, result);
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(6, result);
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(7, result);
- // There should be no messages left
- result = -1;
- EXPECT_TRUE(!queue.tryConsume(result));
- EXPECT_EQ(-1, result);
- }
- void QueueTest::maxReadAtOnce() {
- // Add 100 messages to the queue
- for (int n = 0; n < 100; ++n) {
- queue.putMessage(n);
- }
- EventBase eventBase;
- // Record how many messages were processed each loop iteration.
- uint32_t messagesThisLoop = 0;
- std::vector<uint32_t> messagesPerLoop;
- std::function<void()> loopFinished = [&] {
- // Record the current number of messages read this loop
- messagesPerLoop.push_back(messagesThisLoop);
- // Reset messagesThisLoop to 0 for the next loop
- messagesThisLoop = 0;
- // To prevent use-after-free bugs when eventBase destructs,
- // prevent calling runInLoop any more after the test is finished.
- // 55 == number of times loop should run.
- if (messagesPerLoop.size() != 55) {
- // Reschedule ourself to run at the end of the next loop
- eventBase.runInLoop(loopFinished);
- }
- };
- // Schedule the first call to loopFinished
- eventBase.runInLoop(loopFinished);
- QueueConsumer consumer;
- // Read the first 50 messages 10 at a time.
- consumer.setMaxReadAtOnce(10);
- consumer.fn = [&](int value) {
- ++messagesThisLoop;
- // After 50 messages, drop to reading only 1 message at a time.
- if (value == 50) {
- consumer.setMaxReadAtOnce(1);
- }
- // Terminate the loop when we reach the end of the messages.
- if (value == 99) {
- eventBase.terminateLoopSoon();
- }
- };
- consumer.startConsuming(&eventBase, &queue);
- // Run the event loop until the consumer terminates it
- eventBase.loop();
- // The consumer should have read all 100 messages in order
- EXPECT_EQ(100, consumer.messages.size());
- for (int n = 0; n < 100; ++n) {
- EXPECT_EQ(n, consumer.messages.at(n));
- }
- // Currently EventBase happens to still run the loop callbacks even after
- // terminateLoopSoon() is called. However, we don't really want to depend on
- // this behavior. In case this ever changes in the future, add
- // messagesThisLoop to messagesPerLoop in loop callback isn't invoked for the
- // last loop iteration.
- if (messagesThisLoop > 0) {
- messagesPerLoop.push_back(messagesThisLoop);
- messagesThisLoop = 0;
- }
- // For the first 5 loops it should have read 10 messages each time.
- // After that it should have read 1 messages per loop for the next 50 loops.
- EXPECT_EQ(55, messagesPerLoop.size());
- for (int n = 0; n < 5; ++n) {
- EXPECT_EQ(10, messagesPerLoop.at(n));
- }
- for (int n = 5; n < 55; ++n) {
- EXPECT_EQ(1, messagesPerLoop.at(n));
- }
- }
- void QueueTest::destroyCallback() {
- // Rather than using QueueConsumer, define a separate class for the destroy
- // test. The DestroyTestConsumer will delete itself inside the
- // messageAvailable() callback. With a regular QueueConsumer this would
- // destroy the std::function object while the function is running, which we
- // should probably avoid doing. This uses a pointer to a std::function to
- // avoid destroying the function object.
- class DestroyTestConsumer : public IntQueue::Consumer {
- public:
- void messageAvailable(int&& value) noexcept override {
- DestructorGuard g(this);
- if (fn && *fn) {
- (*fn)(value);
- }
- }
- std::function<void(int)>* fn;
- protected:
- ~DestroyTestConsumer() override = default;
- };
- EventBase eventBase;
- // Create a queue and add 2 messages to it
- queue.putMessage(1);
- queue.putMessage(2);
- // Create two QueueConsumers allocated on the heap.
- // Have whichever one gets called first destroy both of the QueueConsumers.
- // This way one consumer will be destroyed from inside its messageAvailable()
- // callback, and one consume will be destroyed when it isn't inside
- // messageAvailable().
- std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
- consumer1(new DestroyTestConsumer);
- std::unique_ptr<DestroyTestConsumer, DelayedDestruction::Destructor>
- consumer2(new DestroyTestConsumer);
- std::function<void(int)> fn = [&](int) {
- consumer1 = nullptr;
- consumer2 = nullptr;
- };
- consumer1->fn = &fn;
- consumer2->fn = &fn;
- consumer1->startConsuming(&eventBase, &queue);
- consumer2->startConsuming(&eventBase, &queue);
- // Run the event loop.
- eventBase.loop();
- // One of the consumers should have fired, received the message,
- // then destroyed both consumers.
- EXPECT_TRUE(!consumer1);
- EXPECT_TRUE(!consumer2);
- // One message should be left in the queue
- int result = 1;
- EXPECT_TRUE(queue.tryConsume(result));
- EXPECT_EQ(2, result);
- }
- TEST(NotificationQueueTest, ConsumeUntilDrained) {
- // Basic tests: make sure we
- // - drain all the messages
- // - ignore any maxReadAtOnce
- // - can't add messages during draining
- EventBase eventBase;
- IntQueue queue;
- QueueConsumer consumer;
- consumer.fn = [&](int i) {
- EXPECT_THROW(queue.tryPutMessage(i), std::runtime_error);
- EXPECT_FALSE(queue.tryPutMessageNoThrow(i));
- EXPECT_THROW(queue.putMessage(i), std::runtime_error);
- std::vector<int> ints{1, 2, 3};
- EXPECT_THROW(
- queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
- };
- consumer.setMaxReadAtOnce(10); // We should ignore this
- consumer.startConsuming(&eventBase, &queue);
- for (int i = 0; i < 20; i++) {
- queue.putMessage(i);
- }
- EXPECT_TRUE(consumer.consumeUntilDrained());
- EXPECT_EQ(20, consumer.messages.size());
- // Make sure there can only be one drainer at once
- folly::Baton<> callbackBaton, threadStartBaton;
- consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
- QueueConsumer competingConsumer;
- competingConsumer.startConsuming(&eventBase, &queue);
- queue.putMessage(1);
- atomic<bool> raceA{false};
- atomic<bool> raceB{false};
- size_t numConsA = 0;
- size_t numConsB = 0;
- auto thread = std::thread([&] {
- threadStartBaton.post();
- raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
- });
- threadStartBaton.wait();
- raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
- callbackBaton.post();
- thread.join();
- EXPECT_FALSE(raceA && raceB);
- EXPECT_TRUE(raceA || raceB);
- EXPECT_TRUE(raceA ^ raceB);
- }
- TEST(NotificationQueueTest, ConsumeUntilDrainedStress) {
- for (size_t i = 0; i < 1 << 8; ++i) {
- // Basic tests: make sure we
- // - drain all the messages
- // - ignore any maxReadAtOnce
- // - can't add messages during draining
- EventBase eventBase;
- IntQueue queue;
- QueueConsumer consumer;
- consumer.fn = [&](int j) {
- EXPECT_THROW(queue.tryPutMessage(j), std::runtime_error);
- EXPECT_FALSE(queue.tryPutMessageNoThrow(j));
- EXPECT_THROW(queue.putMessage(j), std::runtime_error);
- std::vector<int> ints{1, 2, 3};
- EXPECT_THROW(
- queue.putMessages(ints.begin(), ints.end()), std::runtime_error);
- };
- consumer.setMaxReadAtOnce(10); // We should ignore this
- consumer.startConsuming(&eventBase, &queue);
- for (int j = 0; j < 20; j++) {
- queue.putMessage(j);
- }
- EXPECT_TRUE(consumer.consumeUntilDrained());
- EXPECT_EQ(20, consumer.messages.size());
- // Make sure there can only be one drainer at once
- folly::Baton<> callbackBaton, threadStartBaton;
- consumer.fn = [&](int /* i */) { callbackBaton.wait(); };
- QueueConsumer competingConsumer;
- competingConsumer.startConsuming(&eventBase, &queue);
- queue.putMessage(1);
- atomic<bool> raceA{false};
- atomic<bool> raceB{false};
- size_t numConsA = 0;
- size_t numConsB = 0;
- auto thread = std::thread([&] {
- threadStartBaton.post();
- raceB = consumer.consumeUntilDrained(&numConsB) && numConsB;
- });
- threadStartBaton.wait();
- raceA = competingConsumer.consumeUntilDrained(&numConsA) && numConsA;
- callbackBaton.post();
- thread.join();
- EXPECT_FALSE(raceA && raceB);
- EXPECT_TRUE(raceA || raceB);
- EXPECT_TRUE(raceA ^ raceB);
- }
- }
- #ifdef FOLLY_HAVE_EVENTFD
- TEST(NotificationQueueTest, SendOneEventFD) {
- QueueTest qt(0, IntQueue::FdType::EVENTFD);
- qt.sendOne();
- }
- TEST(NotificationQueueTest, PutMessagesEventFD) {
- QueueTest qt(0, IntQueue::FdType::EVENTFD);
- qt.sendOne();
- }
- TEST(NotificationQueueTest, MultiConsumerEventFD) {
- QueueTest qt(0, IntQueue::FdType::EVENTFD);
- qt.multiConsumer();
- }
- TEST(NotificationQueueTest, MaxQueueSizeEventFD) {
- QueueTest qt(5, IntQueue::FdType::EVENTFD);
- qt.maxQueueSize();
- }
- TEST(NotificationQueueTest, MaxReadAtOnceEventFD) {
- QueueTest qt(0, IntQueue::FdType::EVENTFD);
- qt.maxReadAtOnce();
- }
- TEST(NotificationQueueTest, DestroyCallbackEventFD) {
- QueueTest qt(0, IntQueue::FdType::EVENTFD);
- qt.destroyCallback();
- }
- #endif
- TEST(NotificationQueueTest, SendOnePipe) {
- QueueTest qt(0, IntQueue::FdType::PIPE);
- qt.sendOne();
- }
- TEST(NotificationQueueTest, PutMessagesPipe) {
- QueueTest qt(0, IntQueue::FdType::PIPE);
- qt.sendOne();
- }
- TEST(NotificationQueueTest, MultiConsumerPipe) {
- QueueTest qt(0, IntQueue::FdType::PIPE);
- qt.multiConsumer();
- }
- TEST(NotificationQueueTest, MaxQueueSizePipe) {
- QueueTest qt(5, IntQueue::FdType::PIPE);
- qt.maxQueueSize();
- }
- TEST(NotificationQueueTest, MaxReadAtOncePipe) {
- QueueTest qt(0, IntQueue::FdType::PIPE);
- qt.maxReadAtOnce();
- }
- TEST(NotificationQueueTest, DestroyCallbackPipe) {
- QueueTest qt(0, IntQueue::FdType::PIPE);
- qt.destroyCallback();
- }
- #ifndef _WIN32
- /*
- * Test code that creates a NotificationQueue, then forks, and incorrectly
- * tries to send a message to the queue from the child process.
- *
- * The child process should crash in this scenario, since the child code has a
- * bug. (Older versions of NotificationQueue didn't catch this in the child,
- * resulting in a crash in the parent process.)
- */
- TEST(NotificationQueueTest, UseAfterFork) {
- IntQueue queue;
- int childStatus = 0;
- QueueConsumer consumer;
- // Boost sets a custom SIGCHLD handler, which fails the test if a child
- // process exits abnormally. We don't want this.
- signal(SIGCHLD, SIG_DFL);
- // Log some info so users reading the test output aren't confused
- // by the child process' crash log messages.
- LOG(INFO) << "This test makes sure the child process crashes. "
- << "Error log messagges and a backtrace are expected.";
- {
- // Start a separate thread consuming from the queue
- ScopedEventBaseThread t1;
- t1.getEventBase()->runInEventBaseThread(
- [&] { consumer.startConsuming(t1.getEventBase(), &queue); });
- // Send a message to it, just for sanity checking
- queue.putMessage(1234);
- // Fork
- pid_t pid = fork();
- if (pid == 0) {
- // The boost test framework installs signal handlers to catch errors.
- // We only want to catch in the parent. In the child let SIGABRT crash
- // us normally.
- signal(SIGABRT, SIG_DFL);
- // Child.
- // We're horrible people, so we try to send a message to the queue
- // that is being consumed in the parent process.
- //
- // The putMessage() call should catch this error, and crash our process.
- queue.putMessage(9876);
- // We shouldn't reach here.
- _exit(0);
- }
- PCHECK(pid > 0);
- // Parent. Wait for the child to exit.
- auto waited = waitpid(pid, &childStatus, 0);
- EXPECT_EQ(pid, waited);
- // Send another message to the queue before we terminate the thread.
- queue.putMessage(5678);
- }
- // The child process should have crashed when it tried to call putMessage()
- // on our NotificationQueue.
- EXPECT_TRUE(WIFSIGNALED(childStatus));
- EXPECT_EQ(SIGABRT, WTERMSIG(childStatus));
- // Make sure the parent saw the expected messages.
- // It should have gotten 1234 and 5678 from the parent process, but not
- // 9876 from the child.
- EXPECT_EQ(2, consumer.messages.size());
- EXPECT_EQ(1234, consumer.messages.front());
- consumer.messages.pop_front();
- EXPECT_EQ(5678, consumer.messages.front());
- consumer.messages.pop_front();
- }
- #endif
- TEST(NotificationQueueConsumer, make) {
- int value = 0;
- EventBase evb;
- NotificationQueue<int> queue(32);
- auto consumer =
- decltype(queue)::Consumer::make([&](int&& msg) noexcept { value = msg; });
- consumer->startConsuming(&evb, &queue);
- int const newValue = 10;
- queue.tryPutMessage(newValue);
- evb.loopOnce();
- EXPECT_EQ(newValue, value);
- }
|