AsyncUDPSocketTest.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <thread>
  17. #include <folly/Conv.h>
  18. #include <folly/SocketAddress.h>
  19. #include <folly/String.h>
  20. #include <folly/io/IOBuf.h>
  21. #include <folly/io/async/AsyncTimeout.h>
  22. #include <folly/io/async/AsyncUDPServerSocket.h>
  23. #include <folly/io/async/AsyncUDPSocket.h>
  24. #include <folly/io/async/EventBase.h>
  25. #include <folly/portability/GMock.h>
  26. #include <folly/portability/GTest.h>
  27. #include <folly/portability/Sockets.h>
  28. using folly::AsyncTimeout;
  29. using folly::AsyncUDPServerSocket;
  30. using folly::AsyncUDPSocket;
  31. using folly::errnoStr;
  32. using folly::EventBase;
  33. using folly::IOBuf;
  34. using folly::SocketAddress;
  35. using namespace testing;
  36. class UDPAcceptor : public AsyncUDPServerSocket::Callback {
  37. public:
  38. UDPAcceptor(EventBase* evb, int n, bool changePortForWrites)
  39. : evb_(evb), n_(n), changePortForWrites_(changePortForWrites) {}
  40. void onListenStarted() noexcept override {}
  41. void onListenStopped() noexcept override {}
  42. void onDataAvailable(
  43. std::shared_ptr<folly::AsyncUDPSocket> socket,
  44. const folly::SocketAddress& client,
  45. std::unique_ptr<folly::IOBuf> data,
  46. bool truncated) noexcept override {
  47. lastClient_ = client;
  48. lastMsg_ = data->clone()->moveToFbString().toStdString();
  49. auto len = data->computeChainDataLength();
  50. VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
  51. << "(trun:" << truncated << ") from " << client.describe() << " - "
  52. << lastMsg_;
  53. sendPong(socket);
  54. }
  55. void sendPong(std::shared_ptr<folly::AsyncUDPSocket> socket) noexcept {
  56. try {
  57. auto writeSocket = socket;
  58. if (changePortForWrites_) {
  59. writeSocket = std::make_shared<folly::AsyncUDPSocket>(evb_);
  60. writeSocket->setReuseAddr(false);
  61. writeSocket->bind(folly::SocketAddress("127.0.0.1", 0));
  62. }
  63. writeSocket->write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
  64. } catch (const std::exception& ex) {
  65. VLOG(4) << "Failed to send PONG " << ex.what();
  66. }
  67. }
  68. private:
  69. EventBase* const evb_{nullptr};
  70. const int n_{-1};
  71. // Whether to create a new port per write.
  72. bool changePortForWrites_{true};
  73. folly::SocketAddress lastClient_;
  74. std::string lastMsg_;
  75. };
  76. class UDPServer {
  77. public:
  78. UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
  79. : evb_(evb), addr_(addr), evbs_(n) {}
  80. void start() {
  81. CHECK(evb_->isInEventBaseThread());
  82. socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
  83. try {
  84. socket_->bind(addr_);
  85. VLOG(4) << "Server listening on " << socket_->address().describe();
  86. } catch (const std::exception& ex) {
  87. LOG(FATAL) << ex.what();
  88. }
  89. acceptors_.reserve(evbs_.size());
  90. threads_.reserve(evbs_.size());
  91. // Add numWorkers thread
  92. int i = 0;
  93. for (auto& evb : evbs_) {
  94. acceptors_.emplace_back(&evb, i, changePortForWrites_);
  95. std::thread t([&]() { evb.loopForever(); });
  96. evb.waitUntilRunning();
  97. socket_->addListener(&evb, &acceptors_[i]);
  98. threads_.emplace_back(std::move(t));
  99. ++i;
  100. }
  101. socket_->listen();
  102. }
  103. folly::SocketAddress address() const {
  104. return socket_->address();
  105. }
  106. void shutdown() {
  107. CHECK(evb_->isInEventBaseThread());
  108. socket_->close();
  109. socket_.reset();
  110. for (auto& evb : evbs_) {
  111. evb.terminateLoopSoon();
  112. }
  113. for (auto& t : threads_) {
  114. t.join();
  115. }
  116. }
  117. void pauseAccepting() {
  118. socket_->pauseAccepting();
  119. }
  120. void resumeAccepting() {
  121. socket_->resumeAccepting();
  122. }
  123. // Whether writes from the UDP server should change the port for each message.
  124. void setChangePortForWrites(bool changePortForWrites) {
  125. changePortForWrites_ = changePortForWrites;
  126. }
  127. private:
  128. EventBase* const evb_{nullptr};
  129. const folly::SocketAddress addr_;
  130. std::unique_ptr<AsyncUDPServerSocket> socket_;
  131. std::vector<std::thread> threads_;
  132. std::vector<folly::EventBase> evbs_;
  133. std::vector<UDPAcceptor> acceptors_;
  134. bool changePortForWrites_{true};
  135. };
  136. class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
  137. public:
  138. explicit UDPClient(EventBase* evb) : AsyncTimeout(evb), evb_(evb) {}
  139. void start(const folly::SocketAddress& server, int n) {
  140. CHECK(evb_->isInEventBaseThread());
  141. server_ = server;
  142. socket_ = std::make_unique<AsyncUDPSocket>(evb_);
  143. try {
  144. socket_->bind(folly::SocketAddress("127.0.0.1", 0));
  145. if (connectAddr_) {
  146. connect();
  147. }
  148. VLOG(2) << "Client bound to " << socket_->address().describe();
  149. } catch (const std::exception& ex) {
  150. LOG(FATAL) << ex.what();
  151. }
  152. socket_->resumeRead(this);
  153. n_ = n;
  154. // Start playing ping pong
  155. sendPing();
  156. }
  157. void connect() {
  158. int ret = socket_->connect(*connectAddr_);
  159. if (ret != 0) {
  160. throw folly::AsyncSocketException(
  161. folly::AsyncSocketException::NOT_OPEN, "ConnectFail", errno);
  162. }
  163. VLOG(2) << "Client connected to address=" << *connectAddr_;
  164. }
  165. void shutdown() {
  166. CHECK(evb_->isInEventBaseThread());
  167. socket_->pauseRead();
  168. socket_->close();
  169. socket_.reset();
  170. evb_->terminateLoopSoon();
  171. }
  172. void sendPing() {
  173. if (n_ == 0) {
  174. shutdown();
  175. return;
  176. }
  177. --n_;
  178. scheduleTimeout(5);
  179. writePing(folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
  180. }
  181. virtual void writePing(std::unique_ptr<folly::IOBuf> buf) {
  182. socket_->write(server_, std::move(buf));
  183. }
  184. void getReadBuffer(void** buf, size_t* len) noexcept override {
  185. *buf = buf_;
  186. *len = 1024;
  187. }
  188. void onDataAvailable(
  189. const folly::SocketAddress& client,
  190. size_t len,
  191. bool truncated) noexcept override {
  192. VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
  193. << client.describe() << " - " << std::string(buf_, len);
  194. VLOG(4) << n_ << " left";
  195. ++pongRecvd_;
  196. sendPing();
  197. }
  198. void onReadError(const folly::AsyncSocketException& ex) noexcept override {
  199. VLOG(4) << ex.what();
  200. // Start listening for next PONG
  201. socket_->resumeRead(this);
  202. }
  203. void onReadClosed() noexcept override {
  204. CHECK(false) << "We unregister reads before closing";
  205. }
  206. void timeoutExpired() noexcept override {
  207. VLOG(4) << "Timeout expired";
  208. sendPing();
  209. }
  210. int pongRecvd() const {
  211. return pongRecvd_;
  212. }
  213. AsyncUDPSocket& getSocket() {
  214. return *socket_;
  215. }
  216. void setShouldConnect(const folly::SocketAddress& connectAddr) {
  217. connectAddr_ = connectAddr;
  218. }
  219. protected:
  220. folly::Optional<folly::SocketAddress> connectAddr_;
  221. EventBase* const evb_{nullptr};
  222. folly::SocketAddress server_;
  223. std::unique_ptr<AsyncUDPSocket> socket_;
  224. private:
  225. int pongRecvd_{0};
  226. int n_{0};
  227. char buf_[1024];
  228. };
  229. class ConnectedWriteUDPClient : public UDPClient {
  230. public:
  231. ~ConnectedWriteUDPClient() override = default;
  232. ConnectedWriteUDPClient(EventBase* evb) : UDPClient(evb) {}
  233. // When the socket is connected you don't need to supply the address to send
  234. // msg. This will test that connect worked.
  235. void writePing(std::unique_ptr<folly::IOBuf> buf) override {
  236. iovec vec[16];
  237. size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0]));
  238. if (UNLIKELY(iovec_len == 0)) {
  239. buf->coalesce();
  240. vec[0].iov_base = const_cast<uint8_t*>(buf->data());
  241. vec[0].iov_len = buf->length();
  242. iovec_len = 1;
  243. }
  244. struct msghdr msg;
  245. msg.msg_name = nullptr;
  246. msg.msg_namelen = 0;
  247. msg.msg_iov = const_cast<struct iovec*>(vec);
  248. msg.msg_iovlen = iovec_len;
  249. msg.msg_control = nullptr;
  250. msg.msg_controllen = 0;
  251. msg.msg_flags = 0;
  252. ssize_t ret = ::sendmsg(socket_->getFD(), &msg, 0);
  253. if (ret == -1) {
  254. if (errno != EAGAIN || errno != EWOULDBLOCK) {
  255. throw folly::AsyncSocketException(
  256. folly::AsyncSocketException::NOT_OPEN, "WriteFail", errno);
  257. }
  258. }
  259. connect();
  260. }
  261. };
  262. class AsyncSocketIntegrationTest : public Test {
  263. public:
  264. void SetUp() override {
  265. server = std::make_unique<UDPServer>(
  266. &sevb, folly::SocketAddress("127.0.0.1", 0), 4);
  267. // Start event loop in a separate thread
  268. serverThread =
  269. std::make_unique<std::thread>([this]() { sevb.loopForever(); });
  270. // Wait for event loop to start
  271. sevb.waitUntilRunning();
  272. }
  273. void startServer() {
  274. // Start the server
  275. sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
  276. LOG(INFO) << "Server listening=" << server->address();
  277. }
  278. void TearDown() override {
  279. // Shutdown server
  280. sevb.runInEventBaseThread([&]() {
  281. server->shutdown();
  282. sevb.terminateLoopSoon();
  283. });
  284. // Wait for server thread to join
  285. serverThread->join();
  286. }
  287. std::unique_ptr<UDPClient> performPingPongTest(
  288. folly::Optional<folly::SocketAddress> connectedAddress,
  289. bool useConnectedWrite);
  290. folly::EventBase sevb;
  291. folly::EventBase cevb;
  292. std::unique_ptr<std::thread> serverThread;
  293. std::unique_ptr<UDPServer> server;
  294. std::unique_ptr<UDPClient> client;
  295. };
  296. std::unique_ptr<UDPClient> AsyncSocketIntegrationTest::performPingPongTest(
  297. folly::Optional<folly::SocketAddress> connectedAddress,
  298. bool useConnectedWrite) {
  299. if (useConnectedWrite) {
  300. CHECK(connectedAddress.hasValue());
  301. client = std::make_unique<ConnectedWriteUDPClient>(&cevb);
  302. client->setShouldConnect(*connectedAddress);
  303. } else {
  304. client = std::make_unique<UDPClient>(&cevb);
  305. if (connectedAddress) {
  306. client->setShouldConnect(*connectedAddress);
  307. }
  308. }
  309. // Start event loop in a separate thread
  310. auto clientThread = std::thread([this]() { cevb.loopForever(); });
  311. // Wait for event loop to start
  312. cevb.waitUntilRunning();
  313. // Send ping
  314. cevb.runInEventBaseThread([&]() { client->start(server->address(), 100); });
  315. // Wait for client to finish
  316. clientThread.join();
  317. return std::move(client);
  318. }
  319. TEST_F(AsyncSocketIntegrationTest, PingPong) {
  320. startServer();
  321. auto pingClient = performPingPongTest(folly::none, false);
  322. // This should succeed.
  323. ASSERT_GT(pingClient->pongRecvd(), 0);
  324. }
  325. TEST_F(AsyncSocketIntegrationTest, ConnectedPingPong) {
  326. server->setChangePortForWrites(false);
  327. startServer();
  328. auto pingClient = performPingPongTest(server->address(), false);
  329. // This should succeed
  330. ASSERT_GT(pingClient->pongRecvd(), 0);
  331. }
  332. TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongServerWrongAddress) {
  333. server->setChangePortForWrites(true);
  334. startServer();
  335. auto pingClient = performPingPongTest(server->address(), false);
  336. // This should fail.
  337. ASSERT_EQ(pingClient->pongRecvd(), 0);
  338. }
  339. TEST_F(AsyncSocketIntegrationTest, ConnectedPingPongClientWrongAddress) {
  340. server->setChangePortForWrites(false);
  341. startServer();
  342. folly::SocketAddress connectAddr(
  343. server->address().getIPAddress(), server->address().getPort() + 1);
  344. auto pingClient = performPingPongTest(connectAddr, false);
  345. // This should fail.
  346. ASSERT_EQ(pingClient->pongRecvd(), 0);
  347. }
  348. TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsg) {
  349. server->setChangePortForWrites(false);
  350. startServer();
  351. auto pingClient = performPingPongTest(server->address(), true);
  352. // This should succeed.
  353. ASSERT_GT(pingClient->pongRecvd(), 0);
  354. }
  355. TEST_F(AsyncSocketIntegrationTest, PingPongUseConnectedSendMsgServerWrongAddr) {
  356. server->setChangePortForWrites(true);
  357. startServer();
  358. auto pingClient = performPingPongTest(server->address(), true);
  359. // This should fail.
  360. ASSERT_EQ(pingClient->pongRecvd(), 0);
  361. }
  362. TEST_F(AsyncSocketIntegrationTest, PingPongPauseResumeListening) {
  363. startServer();
  364. // Exchange should not happen when paused.
  365. server->pauseAccepting();
  366. auto pausedClient = performPingPongTest(folly::none, false);
  367. ASSERT_EQ(pausedClient->pongRecvd(), 0);
  368. // Exchange does occur after resuming.
  369. server->resumeAccepting();
  370. auto pingClient = performPingPongTest(folly::none, false);
  371. ASSERT_GT(pingClient->pongRecvd(), 0);
  372. }
  373. class TestAsyncUDPSocket : public AsyncUDPSocket {
  374. public:
  375. explicit TestAsyncUDPSocket(EventBase* evb) : AsyncUDPSocket(evb) {}
  376. MOCK_METHOD3(sendmsg, ssize_t(int, const struct msghdr*, int));
  377. };
  378. class MockErrMessageCallback : public AsyncUDPSocket::ErrMessageCallback {
  379. public:
  380. ~MockErrMessageCallback() override = default;
  381. MOCK_METHOD1(errMessage_, void(const cmsghdr&));
  382. void errMessage(const cmsghdr& cmsg) noexcept override {
  383. errMessage_(cmsg);
  384. }
  385. MOCK_METHOD1(errMessageError_, void(const folly::AsyncSocketException&));
  386. void errMessageError(
  387. const folly::AsyncSocketException& ex) noexcept override {
  388. errMessageError_(ex);
  389. }
  390. };
  391. class MockUDPReadCallback : public AsyncUDPSocket::ReadCallback {
  392. public:
  393. ~MockUDPReadCallback() override = default;
  394. MOCK_METHOD2(getReadBuffer_, void(void**, size_t*));
  395. void getReadBuffer(void** buf, size_t* len) noexcept override {
  396. getReadBuffer_(buf, len);
  397. }
  398. MOCK_METHOD3(
  399. onDataAvailable_,
  400. void(const folly::SocketAddress&, size_t, bool));
  401. void onDataAvailable(
  402. const folly::SocketAddress& client,
  403. size_t len,
  404. bool truncated) noexcept override {
  405. onDataAvailable_(client, len, truncated);
  406. }
  407. MOCK_METHOD1(onReadError_, void(const folly::AsyncSocketException&));
  408. void onReadError(const folly::AsyncSocketException& ex) noexcept override {
  409. onReadError_(ex);
  410. }
  411. MOCK_METHOD0(onReadClosed_, void());
  412. void onReadClosed() noexcept override {
  413. onReadClosed_();
  414. }
  415. };
  416. class AsyncUDPSocketTest : public Test {
  417. public:
  418. void SetUp() override {
  419. socket_ = std::make_shared<AsyncUDPSocket>(&evb_);
  420. addr_ = folly::SocketAddress("127.0.0.1", 0);
  421. socket_->bind(addr_);
  422. }
  423. EventBase evb_;
  424. MockErrMessageCallback err;
  425. MockUDPReadCallback readCb;
  426. std::shared_ptr<AsyncUDPSocket> socket_;
  427. folly::SocketAddress addr_;
  428. };
  429. TEST_F(AsyncUDPSocketTest, TestConnect) {
  430. EXPECT_EQ(socket_->connect(addr_), 0);
  431. }
  432. TEST_F(AsyncUDPSocketTest, TestErrToNonExistentServer) {
  433. socket_->resumeRead(&readCb);
  434. socket_->setErrMessageCallback(&err);
  435. folly::SocketAddress addr("127.0.0.1", 10000);
  436. bool errRecvd = false;
  437. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  438. EXPECT_CALL(err, errMessage_(_))
  439. .WillOnce(Invoke([this, &errRecvd](auto& cmsg) {
  440. if ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
  441. (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
  442. const struct sock_extended_err* serr =
  443. reinterpret_cast<const struct sock_extended_err*>(
  444. CMSG_DATA(&cmsg));
  445. errRecvd =
  446. (serr->ee_origin == SO_EE_ORIGIN_ICMP || SO_EE_ORIGIN_ICMP6);
  447. LOG(ERROR) << "errno " << errnoStr(serr->ee_errno);
  448. }
  449. evb_.terminateLoopSoon();
  450. }));
  451. #endif // FOLLY_HAVE_MSG_ERRQUEUE
  452. socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
  453. evb_.loopForever();
  454. EXPECT_TRUE(errRecvd);
  455. }
  456. TEST_F(AsyncUDPSocketTest, TestUnsetErrCallback) {
  457. socket_->resumeRead(&readCb);
  458. socket_->setErrMessageCallback(&err);
  459. socket_->setErrMessageCallback(nullptr);
  460. folly::SocketAddress addr("127.0.0.1", 10000);
  461. EXPECT_CALL(err, errMessage_(_)).Times(0);
  462. socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
  463. evb_.timer().scheduleTimeoutFn(
  464. [&] { evb_.terminateLoopSoon(); }, std::chrono::milliseconds(30));
  465. evb_.loopForever();
  466. }
  467. TEST_F(AsyncUDPSocketTest, CloseInErrorCallback) {
  468. socket_->resumeRead(&readCb);
  469. socket_->setErrMessageCallback(&err);
  470. folly::SocketAddress addr("127.0.0.1", 10000);
  471. bool errRecvd = false;
  472. EXPECT_CALL(err, errMessage_(_)).WillOnce(Invoke([this, &errRecvd](auto&) {
  473. errRecvd = true;
  474. socket_->close();
  475. evb_.terminateLoopSoon();
  476. }));
  477. socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
  478. socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
  479. evb_.loopForever();
  480. EXPECT_TRUE(errRecvd);
  481. }
  482. TEST_F(AsyncUDPSocketTest, TestNonExistentServerNoErrCb) {
  483. socket_->resumeRead(&readCb);
  484. folly::SocketAddress addr("127.0.0.1", 10000);
  485. bool errRecvd = false;
  486. folly::IOBufQueue readBuf;
  487. EXPECT_CALL(readCb, getReadBuffer_(_, _))
  488. .WillRepeatedly(Invoke([&readBuf](void** buf, size_t* len) {
  489. auto readSpace = readBuf.preallocate(2000, 10000);
  490. *buf = readSpace.first;
  491. *len = readSpace.second;
  492. }));
  493. ON_CALL(readCb, onReadError_(_)).WillByDefault(Invoke([&errRecvd](auto& ex) {
  494. LOG(ERROR) << ex.what();
  495. errRecvd = true;
  496. }));
  497. socket_->write(addr, folly::IOBuf::copyBuffer("hey"));
  498. evb_.timer().scheduleTimeoutFn(
  499. [&] { evb_.terminateLoopSoon(); }, std::chrono::milliseconds(30));
  500. evb_.loopForever();
  501. EXPECT_FALSE(errRecvd);
  502. }
  503. TEST_F(AsyncUDPSocketTest, TestBound) {
  504. AsyncUDPSocket socket(&evb_);
  505. EXPECT_FALSE(socket.isBound());
  506. folly::SocketAddress address("0.0.0.0", 0);
  507. socket.bind(address);
  508. EXPECT_TRUE(socket.isBound());
  509. }
  510. TEST_F(AsyncUDPSocketTest, TestAttachAfterDetachEvbWithReadCallback) {
  511. socket_->resumeRead(&readCb);
  512. EXPECT_TRUE(socket_->isHandlerRegistered());
  513. socket_->detachEventBase();
  514. EXPECT_FALSE(socket_->isHandlerRegistered());
  515. socket_->attachEventBase(&evb_);
  516. EXPECT_TRUE(socket_->isHandlerRegistered());
  517. }
  518. TEST_F(AsyncUDPSocketTest, TestAttachAfterDetachEvbNoReadCallback) {
  519. EXPECT_FALSE(socket_->isHandlerRegistered());
  520. socket_->detachEventBase();
  521. EXPECT_FALSE(socket_->isHandlerRegistered());
  522. socket_->attachEventBase(&evb_);
  523. EXPECT_FALSE(socket_->isHandlerRegistered());
  524. }
  525. TEST_F(AsyncUDPSocketTest, TestDetachAttach) {
  526. folly::EventBase evb2;
  527. auto writeSocket = std::make_shared<folly::AsyncUDPSocket>(&evb_);
  528. folly::SocketAddress address("127.0.0.1", 0);
  529. writeSocket->bind(address);
  530. std::array<uint8_t, 1024> data;
  531. std::atomic<int> packetsRecvd{0};
  532. EXPECT_CALL(readCb, getReadBuffer_(_, _))
  533. .WillRepeatedly(Invoke([&](void** buf, size_t* len) {
  534. *buf = data.data();
  535. *len = 1024;
  536. }));
  537. EXPECT_CALL(readCb, onDataAvailable_(_, _, _))
  538. .WillRepeatedly(Invoke(
  539. [&](const folly::SocketAddress&, size_t, bool) { packetsRecvd++; }));
  540. socket_->resumeRead(&readCb);
  541. writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
  542. while (packetsRecvd != 1) {
  543. evb_.loopOnce();
  544. }
  545. EXPECT_EQ(packetsRecvd, 1);
  546. socket_->detachEventBase();
  547. std::thread t([&] { evb2.loopForever(); });
  548. evb2.runInEventBaseThreadAndWait([&] { socket_->attachEventBase(&evb2); });
  549. writeSocket->write(socket_->address(), folly::IOBuf::copyBuffer("hello"));
  550. auto now = std::chrono::steady_clock::now();
  551. while (packetsRecvd != 2 ||
  552. std::chrono::steady_clock::now() <
  553. now + std::chrono::milliseconds(10)) {
  554. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  555. }
  556. evb2.runInEventBaseThread([&] {
  557. socket_ = nullptr;
  558. evb2.terminateLoopSoon();
  559. });
  560. t.join();
  561. EXPECT_EQ(packetsRecvd, 2);
  562. }