123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- /*
- * Copyright 2014-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <bitset>
- #include <future>
- #include <thread>
- #include <folly/MPMCQueue.h>
- #include <folly/ScopeGuard.h>
- #include <folly/io/async/EventBase.h>
- #include <folly/io/async/EventHandler.h>
- #include <folly/portability/GMock.h>
- #include <folly/portability/GTest.h>
- #include <folly/portability/Sockets.h>
- #include <sys/eventfd.h>
- using namespace std;
- using namespace folly;
- using namespace testing;
- void runInThreadsAndWait(size_t nthreads, function<void(size_t)> cb) {
- vector<thread> threads(nthreads);
- for (size_t i = 0; i < nthreads; ++i) {
- threads[i] = thread(cb, i);
- }
- for (size_t i = 0; i < nthreads; ++i) {
- threads[i].join();
- }
- }
- void runInThreadsAndWait(vector<function<void()>> cbs) {
- runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
- }
- class EventHandlerMock : public EventHandler {
- public:
- EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
- // gmock can't mock noexcept methods, so we need an intermediary
- MOCK_METHOD1(_handlerReady, void(uint16_t));
- void handlerReady(uint16_t events) noexcept override {
- _handlerReady(events);
- }
- };
- class EventHandlerTest : public Test {
- public:
- int efd = 0;
- void SetUp() override {
- efd = eventfd(0, EFD_SEMAPHORE);
- ASSERT_THAT(efd, Gt(0));
- }
- void TearDown() override {
- if (efd > 0) {
- close(efd);
- }
- efd = 0;
- }
- void efd_write(uint64_t val) {
- write(efd, &val, sizeof(val));
- }
- uint64_t efd_read() {
- uint64_t val = 0;
- read(efd, &val, sizeof(val));
- return val;
- }
- };
- TEST_F(EventHandlerTest, simple) {
- const size_t writes = 4;
- size_t readsRemaining = writes;
- EventBase eb;
- EventHandlerMock eh(&eb, efd);
- eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
- EXPECT_CALL(eh, _handlerReady(_))
- .Times(writes)
- .WillRepeatedly(Invoke([&](uint16_t /* events */) {
- efd_read();
- if (--readsRemaining == 0) {
- eh.unregisterHandler();
- }
- }));
- efd_write(writes);
- eb.loop();
- EXPECT_EQ(0, readsRemaining);
- }
- TEST_F(EventHandlerTest, many_concurrent_producers) {
- const size_t writes = 200;
- const size_t nproducers = 20;
- size_t readsRemaining = writes;
- runInThreadsAndWait({
- [&] {
- EventBase eb;
- EventHandlerMock eh(&eb, efd);
- eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
- EXPECT_CALL(eh, _handlerReady(_))
- .Times(writes)
- .WillRepeatedly(Invoke([&](uint16_t /* events */) {
- efd_read();
- if (--readsRemaining == 0) {
- eh.unregisterHandler();
- }
- }));
- eb.loop();
- },
- [&] {
- runInThreadsAndWait(nproducers, [&](size_t /* k */) {
- for (size_t i = 0; i < writes / nproducers; ++i) {
- this_thread::sleep_for(std::chrono::milliseconds(1));
- efd_write(1);
- }
- });
- },
- });
- EXPECT_EQ(0, readsRemaining);
- }
- TEST_F(EventHandlerTest, many_concurrent_consumers) {
- const size_t writes = 200;
- const size_t nproducers = 8;
- const size_t nconsumers = 20;
- atomic<size_t> writesRemaining(writes);
- atomic<size_t> readsRemaining(writes);
- MPMCQueue<nullptr_t> queue(writes / 10);
- runInThreadsAndWait({
- [&] {
- runInThreadsAndWait(nconsumers, [&](size_t /* k */) {
- size_t thReadsRemaining = writes / nconsumers;
- EventBase eb;
- EventHandlerMock eh(&eb, efd);
- eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
- EXPECT_CALL(eh, _handlerReady(_))
- .WillRepeatedly(Invoke([&](uint16_t /* events */) {
- nullptr_t val;
- if (!queue.readIfNotEmpty(val)) {
- return;
- }
- efd_read();
- --readsRemaining;
- if (--thReadsRemaining == 0) {
- eh.unregisterHandler();
- }
- }));
- eb.loop();
- });
- },
- [&] {
- runInThreadsAndWait(nproducers, [&](size_t /* k */) {
- for (size_t i = 0; i < writes / nproducers; ++i) {
- this_thread::sleep_for(std::chrono::milliseconds(1));
- queue.blockingWrite(nullptr);
- efd_write(1);
- --writesRemaining;
- }
- });
- },
- });
- EXPECT_EQ(0, writesRemaining);
- EXPECT_EQ(0, readsRemaining);
- }
- #ifdef EV_PRI
- //
- // See rfc6093 for extensive discussion on TCP URG semantics. Specificaly,
- // it points out that URG mechanism was never intended to be used
- // for out-of-band information delivery. However, pretty much every
- // implementation interprets the LAST octect or urgent data as the
- // OOB byte.
- //
- class EventHandlerOobTest : public ::testing::Test {
- public:
- //
- // Wait for port number to connect to, then connect and invoke
- // clientOps(fd) where fd is the connection file descriptor
- //
- void runClient(std::function<void(int fd)> clientOps) {
- clientThread = std::thread([serverPortFuture = serverReady.get_future(),
- clientOps]() mutable {
- int clientFd = socket(AF_INET, SOCK_STREAM, 0);
- SCOPE_EXIT {
- close(clientFd);
- };
- struct hostent* he{nullptr};
- struct sockaddr_in server;
- std::array<const char, 10> hostname = {"localhost"};
- he = gethostbyname(hostname.data());
- PCHECK(he);
- memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
- server.sin_family = AF_INET;
- // block here until port is known
- server.sin_port = serverPortFuture.get();
- LOG(INFO) << "Server is ready";
- PCHECK(
- ::connect(clientFd, (struct sockaddr*)&server, sizeof(server)) == 0);
- LOG(INFO) << "Server connection available";
- clientOps(clientFd);
- });
- }
- //
- // Bind, get port number, pass it to client, listen/accept and store the
- // accepted fd
- //
- void acceptConn() {
- // make the server.
- int listenfd = socket(AF_INET, SOCK_STREAM, 0);
- SCOPE_EXIT {
- close(listenfd);
- };
- PCHECK(listenfd != -1) << "unable to open socket";
- struct sockaddr_in sin;
- sin.sin_port = htons(0);
- sin.sin_addr.s_addr = INADDR_ANY;
- sin.sin_family = AF_INET;
- PCHECK(bind(listenfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
- << "Can't bind to port";
- listen(listenfd, 5);
- struct sockaddr_in findSockName;
- socklen_t sz = sizeof(findSockName);
- getsockname(listenfd, (struct sockaddr*)&findSockName, &sz);
- serverReady.set_value(findSockName.sin_port);
- struct sockaddr_in cli_addr;
- socklen_t clilen = sizeof(cli_addr);
- serverFd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen);
- PCHECK(serverFd >= 0) << "can't accept";
- }
- void SetUp() override {}
- void TearDown() override {
- clientThread.join();
- close(serverFd);
- }
- EventBase eb;
- std::thread clientThread;
- std::promise<decltype(sockaddr_in::sin_port)> serverReady;
- int serverFd{-1};
- };
- //
- // Test that sending OOB data is detected by event handler
- //
- TEST_F(EventHandlerOobTest, EPOLLPRI) {
- auto clientOps = [](int fd) {
- char buffer[] = "banana";
- int n = send(fd, buffer, strlen(buffer) + 1, MSG_OOB);
- LOG(INFO) << "Client send finished";
- PCHECK(n > 0);
- };
- runClient(clientOps);
- acceptConn();
- struct SockEvent : public EventHandler {
- SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
- void handlerReady(uint16_t events) noexcept override {
- EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
- std::array<char, 255> buffer;
- int n = read(fd_, buffer.data(), buffer.size());
- //
- // NB: we sent 7 bytes, but only received 6. The last byte
- // has been stored in the OOB buffer.
- //
- EXPECT_EQ(6, n);
- EXPECT_EQ("banana", std::string(buffer.data(), 6));
- // now read the byte stored in OOB buffer
- n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
- EXPECT_EQ(1, n);
- }
- private:
- int fd_;
- } sockHandler(&eb, serverFd);
- sockHandler.registerHandler(EventHandler::EventFlags::PRI);
- LOG(INFO) << "Registered Handler";
- eb.loop();
- }
- //
- // Test if we can send an OOB byte and then normal data
- //
- TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) {
- auto clientOps = [](int sockfd) {
- {
- // OOB buffer can only hold one byte in most implementations
- std::array<char, 2> buffer = {"X"};
- int n = send(sockfd, buffer.data(), 1, MSG_OOB);
- PCHECK(n > 0);
- }
- {
- std::array<char, 7> buffer = {"banana"};
- int n = send(sockfd, buffer.data(), buffer.size(), 0);
- PCHECK(n > 0);
- }
- };
- runClient(clientOps);
- acceptConn();
- struct SockEvent : public EventHandler {
- SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), eb_(eb), fd_(fd) {}
- void handlerReady(uint16_t events) noexcept override {
- std::array<char, 255> buffer;
- if (events & EventHandler::EventFlags::PRI) {
- int n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
- EXPECT_EQ(1, n);
- EXPECT_EQ("X", std::string(buffer.data(), 1));
- registerHandler(EventHandler::EventFlags::READ);
- return;
- }
- if (events & EventHandler::EventFlags::READ) {
- int n = recv(fd_, buffer.data(), buffer.size(), 0);
- EXPECT_EQ(7, n);
- EXPECT_EQ("banana", std::string(buffer.data()));
- eb_->terminateLoopSoon();
- return;
- }
- }
- private:
- EventBase* eb_;
- int fd_;
- } sockHandler(&eb, serverFd);
- sockHandler.registerHandler(
- EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ);
- LOG(INFO) << "Registered Handler";
- eb.loopForever();
- }
- //
- // Demonstrate that "regular" reads ignore the OOB byte sent to us
- //
- TEST_F(EventHandlerOobTest, SWALLOW_OOB) {
- auto clientOps = [](int sockfd) {
- {
- std::array<char, 2> buffer = {"X"};
- int n = send(sockfd, buffer.data(), 1, MSG_OOB);
- PCHECK(n > 0);
- }
- {
- std::array<char, 7> buffer = {"banana"};
- int n = send(sockfd, buffer.data(), buffer.size(), 0);
- PCHECK(n > 0);
- }
- };
- runClient(clientOps);
- acceptConn();
- struct SockEvent : public EventHandler {
- SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
- void handlerReady(uint16_t events) noexcept override {
- std::array<char, 255> buffer;
- ASSERT_TRUE(events & EventHandler::EventFlags::READ);
- int n = recv(fd_, buffer.data(), buffer.size(), 0);
- EXPECT_EQ(7, n);
- EXPECT_EQ("banana", std::string(buffer.data()));
- }
- private:
- int fd_;
- } sockHandler(&eb, serverFd);
- sockHandler.registerHandler(EventHandler::EventFlags::READ);
- LOG(INFO) << "Registered Handler";
- eb.loop();
- }
- #endif
|