EventHandlerTest.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <bitset>
  17. #include <future>
  18. #include <thread>
  19. #include <folly/MPMCQueue.h>
  20. #include <folly/ScopeGuard.h>
  21. #include <folly/io/async/EventBase.h>
  22. #include <folly/io/async/EventHandler.h>
  23. #include <folly/portability/GMock.h>
  24. #include <folly/portability/GTest.h>
  25. #include <folly/portability/Sockets.h>
  26. #include <sys/eventfd.h>
  27. using namespace std;
  28. using namespace folly;
  29. using namespace testing;
  30. void runInThreadsAndWait(size_t nthreads, function<void(size_t)> cb) {
  31. vector<thread> threads(nthreads);
  32. for (size_t i = 0; i < nthreads; ++i) {
  33. threads[i] = thread(cb, i);
  34. }
  35. for (size_t i = 0; i < nthreads; ++i) {
  36. threads[i].join();
  37. }
  38. }
  39. void runInThreadsAndWait(vector<function<void()>> cbs) {
  40. runInThreadsAndWait(cbs.size(), [&](size_t k) { cbs[k](); });
  41. }
  42. class EventHandlerMock : public EventHandler {
  43. public:
  44. EventHandlerMock(EventBase* eb, int fd) : EventHandler(eb, fd) {}
  45. // gmock can't mock noexcept methods, so we need an intermediary
  46. MOCK_METHOD1(_handlerReady, void(uint16_t));
  47. void handlerReady(uint16_t events) noexcept override {
  48. _handlerReady(events);
  49. }
  50. };
  51. class EventHandlerTest : public Test {
  52. public:
  53. int efd = 0;
  54. void SetUp() override {
  55. efd = eventfd(0, EFD_SEMAPHORE);
  56. ASSERT_THAT(efd, Gt(0));
  57. }
  58. void TearDown() override {
  59. if (efd > 0) {
  60. close(efd);
  61. }
  62. efd = 0;
  63. }
  64. void efd_write(uint64_t val) {
  65. write(efd, &val, sizeof(val));
  66. }
  67. uint64_t efd_read() {
  68. uint64_t val = 0;
  69. read(efd, &val, sizeof(val));
  70. return val;
  71. }
  72. };
  73. TEST_F(EventHandlerTest, simple) {
  74. const size_t writes = 4;
  75. size_t readsRemaining = writes;
  76. EventBase eb;
  77. EventHandlerMock eh(&eb, efd);
  78. eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
  79. EXPECT_CALL(eh, _handlerReady(_))
  80. .Times(writes)
  81. .WillRepeatedly(Invoke([&](uint16_t /* events */) {
  82. efd_read();
  83. if (--readsRemaining == 0) {
  84. eh.unregisterHandler();
  85. }
  86. }));
  87. efd_write(writes);
  88. eb.loop();
  89. EXPECT_EQ(0, readsRemaining);
  90. }
  91. TEST_F(EventHandlerTest, many_concurrent_producers) {
  92. const size_t writes = 200;
  93. const size_t nproducers = 20;
  94. size_t readsRemaining = writes;
  95. runInThreadsAndWait({
  96. [&] {
  97. EventBase eb;
  98. EventHandlerMock eh(&eb, efd);
  99. eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
  100. EXPECT_CALL(eh, _handlerReady(_))
  101. .Times(writes)
  102. .WillRepeatedly(Invoke([&](uint16_t /* events */) {
  103. efd_read();
  104. if (--readsRemaining == 0) {
  105. eh.unregisterHandler();
  106. }
  107. }));
  108. eb.loop();
  109. },
  110. [&] {
  111. runInThreadsAndWait(nproducers, [&](size_t /* k */) {
  112. for (size_t i = 0; i < writes / nproducers; ++i) {
  113. this_thread::sleep_for(std::chrono::milliseconds(1));
  114. efd_write(1);
  115. }
  116. });
  117. },
  118. });
  119. EXPECT_EQ(0, readsRemaining);
  120. }
  121. TEST_F(EventHandlerTest, many_concurrent_consumers) {
  122. const size_t writes = 200;
  123. const size_t nproducers = 8;
  124. const size_t nconsumers = 20;
  125. atomic<size_t> writesRemaining(writes);
  126. atomic<size_t> readsRemaining(writes);
  127. MPMCQueue<nullptr_t> queue(writes / 10);
  128. runInThreadsAndWait({
  129. [&] {
  130. runInThreadsAndWait(nconsumers, [&](size_t /* k */) {
  131. size_t thReadsRemaining = writes / nconsumers;
  132. EventBase eb;
  133. EventHandlerMock eh(&eb, efd);
  134. eh.registerHandler(EventHandler::READ | EventHandler::PERSIST);
  135. EXPECT_CALL(eh, _handlerReady(_))
  136. .WillRepeatedly(Invoke([&](uint16_t /* events */) {
  137. nullptr_t val;
  138. if (!queue.readIfNotEmpty(val)) {
  139. return;
  140. }
  141. efd_read();
  142. --readsRemaining;
  143. if (--thReadsRemaining == 0) {
  144. eh.unregisterHandler();
  145. }
  146. }));
  147. eb.loop();
  148. });
  149. },
  150. [&] {
  151. runInThreadsAndWait(nproducers, [&](size_t /* k */) {
  152. for (size_t i = 0; i < writes / nproducers; ++i) {
  153. this_thread::sleep_for(std::chrono::milliseconds(1));
  154. queue.blockingWrite(nullptr);
  155. efd_write(1);
  156. --writesRemaining;
  157. }
  158. });
  159. },
  160. });
  161. EXPECT_EQ(0, writesRemaining);
  162. EXPECT_EQ(0, readsRemaining);
  163. }
  164. #ifdef EV_PRI
  165. //
  166. // See rfc6093 for extensive discussion on TCP URG semantics. Specificaly,
  167. // it points out that URG mechanism was never intended to be used
  168. // for out-of-band information delivery. However, pretty much every
  169. // implementation interprets the LAST octect or urgent data as the
  170. // OOB byte.
  171. //
  172. class EventHandlerOobTest : public ::testing::Test {
  173. public:
  174. //
  175. // Wait for port number to connect to, then connect and invoke
  176. // clientOps(fd) where fd is the connection file descriptor
  177. //
  178. void runClient(std::function<void(int fd)> clientOps) {
  179. clientThread = std::thread([serverPortFuture = serverReady.get_future(),
  180. clientOps]() mutable {
  181. int clientFd = socket(AF_INET, SOCK_STREAM, 0);
  182. SCOPE_EXIT {
  183. close(clientFd);
  184. };
  185. struct hostent* he{nullptr};
  186. struct sockaddr_in server;
  187. std::array<const char, 10> hostname = {"localhost"};
  188. he = gethostbyname(hostname.data());
  189. PCHECK(he);
  190. memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
  191. server.sin_family = AF_INET;
  192. // block here until port is known
  193. server.sin_port = serverPortFuture.get();
  194. LOG(INFO) << "Server is ready";
  195. PCHECK(
  196. ::connect(clientFd, (struct sockaddr*)&server, sizeof(server)) == 0);
  197. LOG(INFO) << "Server connection available";
  198. clientOps(clientFd);
  199. });
  200. }
  201. //
  202. // Bind, get port number, pass it to client, listen/accept and store the
  203. // accepted fd
  204. //
  205. void acceptConn() {
  206. // make the server.
  207. int listenfd = socket(AF_INET, SOCK_STREAM, 0);
  208. SCOPE_EXIT {
  209. close(listenfd);
  210. };
  211. PCHECK(listenfd != -1) << "unable to open socket";
  212. struct sockaddr_in sin;
  213. sin.sin_port = htons(0);
  214. sin.sin_addr.s_addr = INADDR_ANY;
  215. sin.sin_family = AF_INET;
  216. PCHECK(bind(listenfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
  217. << "Can't bind to port";
  218. listen(listenfd, 5);
  219. struct sockaddr_in findSockName;
  220. socklen_t sz = sizeof(findSockName);
  221. getsockname(listenfd, (struct sockaddr*)&findSockName, &sz);
  222. serverReady.set_value(findSockName.sin_port);
  223. struct sockaddr_in cli_addr;
  224. socklen_t clilen = sizeof(cli_addr);
  225. serverFd = accept(listenfd, (struct sockaddr*)&cli_addr, &clilen);
  226. PCHECK(serverFd >= 0) << "can't accept";
  227. }
  228. void SetUp() override {}
  229. void TearDown() override {
  230. clientThread.join();
  231. close(serverFd);
  232. }
  233. EventBase eb;
  234. std::thread clientThread;
  235. std::promise<decltype(sockaddr_in::sin_port)> serverReady;
  236. int serverFd{-1};
  237. };
  238. //
  239. // Test that sending OOB data is detected by event handler
  240. //
  241. TEST_F(EventHandlerOobTest, EPOLLPRI) {
  242. auto clientOps = [](int fd) {
  243. char buffer[] = "banana";
  244. int n = send(fd, buffer, strlen(buffer) + 1, MSG_OOB);
  245. LOG(INFO) << "Client send finished";
  246. PCHECK(n > 0);
  247. };
  248. runClient(clientOps);
  249. acceptConn();
  250. struct SockEvent : public EventHandler {
  251. SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
  252. void handlerReady(uint16_t events) noexcept override {
  253. EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
  254. std::array<char, 255> buffer;
  255. int n = read(fd_, buffer.data(), buffer.size());
  256. //
  257. // NB: we sent 7 bytes, but only received 6. The last byte
  258. // has been stored in the OOB buffer.
  259. //
  260. EXPECT_EQ(6, n);
  261. EXPECT_EQ("banana", std::string(buffer.data(), 6));
  262. // now read the byte stored in OOB buffer
  263. n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
  264. EXPECT_EQ(1, n);
  265. }
  266. private:
  267. int fd_;
  268. } sockHandler(&eb, serverFd);
  269. sockHandler.registerHandler(EventHandler::EventFlags::PRI);
  270. LOG(INFO) << "Registered Handler";
  271. eb.loop();
  272. }
  273. //
  274. // Test if we can send an OOB byte and then normal data
  275. //
  276. TEST_F(EventHandlerOobTest, OOB_AND_NORMAL_DATA) {
  277. auto clientOps = [](int sockfd) {
  278. {
  279. // OOB buffer can only hold one byte in most implementations
  280. std::array<char, 2> buffer = {"X"};
  281. int n = send(sockfd, buffer.data(), 1, MSG_OOB);
  282. PCHECK(n > 0);
  283. }
  284. {
  285. std::array<char, 7> buffer = {"banana"};
  286. int n = send(sockfd, buffer.data(), buffer.size(), 0);
  287. PCHECK(n > 0);
  288. }
  289. };
  290. runClient(clientOps);
  291. acceptConn();
  292. struct SockEvent : public EventHandler {
  293. SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), eb_(eb), fd_(fd) {}
  294. void handlerReady(uint16_t events) noexcept override {
  295. std::array<char, 255> buffer;
  296. if (events & EventHandler::EventFlags::PRI) {
  297. int n = recv(fd_, buffer.data(), buffer.size(), MSG_OOB);
  298. EXPECT_EQ(1, n);
  299. EXPECT_EQ("X", std::string(buffer.data(), 1));
  300. registerHandler(EventHandler::EventFlags::READ);
  301. return;
  302. }
  303. if (events & EventHandler::EventFlags::READ) {
  304. int n = recv(fd_, buffer.data(), buffer.size(), 0);
  305. EXPECT_EQ(7, n);
  306. EXPECT_EQ("banana", std::string(buffer.data()));
  307. eb_->terminateLoopSoon();
  308. return;
  309. }
  310. }
  311. private:
  312. EventBase* eb_;
  313. int fd_;
  314. } sockHandler(&eb, serverFd);
  315. sockHandler.registerHandler(
  316. EventHandler::EventFlags::PRI | EventHandler::EventFlags::READ);
  317. LOG(INFO) << "Registered Handler";
  318. eb.loopForever();
  319. }
  320. //
  321. // Demonstrate that "regular" reads ignore the OOB byte sent to us
  322. //
  323. TEST_F(EventHandlerOobTest, SWALLOW_OOB) {
  324. auto clientOps = [](int sockfd) {
  325. {
  326. std::array<char, 2> buffer = {"X"};
  327. int n = send(sockfd, buffer.data(), 1, MSG_OOB);
  328. PCHECK(n > 0);
  329. }
  330. {
  331. std::array<char, 7> buffer = {"banana"};
  332. int n = send(sockfd, buffer.data(), buffer.size(), 0);
  333. PCHECK(n > 0);
  334. }
  335. };
  336. runClient(clientOps);
  337. acceptConn();
  338. struct SockEvent : public EventHandler {
  339. SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
  340. void handlerReady(uint16_t events) noexcept override {
  341. std::array<char, 255> buffer;
  342. ASSERT_TRUE(events & EventHandler::EventFlags::READ);
  343. int n = recv(fd_, buffer.data(), buffer.size(), 0);
  344. EXPECT_EQ(7, n);
  345. EXPECT_EQ("banana", std::string(buffer.data()));
  346. }
  347. private:
  348. int fd_;
  349. } sockHandler(&eb, serverFd);
  350. sockHandler.registerHandler(EventHandler::EventFlags::READ);
  351. LOG(INFO) << "Registered Handler";
  352. eb.loop();
  353. }
  354. #endif