AsyncSocketTest2.cpp 115 KB


  1. /*
  2. * Copyright 2010-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/async/test/AsyncSocketTest2.h>
  17. #include <folly/ExceptionWrapper.h>
  18. #include <folly/Random.h>
  19. #include <folly/SocketAddress.h>
  20. #include <folly/io/async/AsyncSocket.h>
  21. #include <folly/io/async/AsyncTimeout.h>
  22. #include <folly/io/async/EventBase.h>
  23. #include <folly/io/async/ScopedEventBaseThread.h>
  24. #include <folly/experimental/TestUtil.h>
  25. #include <folly/io/IOBuf.h>
  26. #include <folly/io/async/test/AsyncSocketTest.h>
  27. #include <folly/io/async/test/Util.h>
  28. #include <folly/portability/GMock.h>
  29. #include <folly/portability/GTest.h>
  30. #include <folly/portability/Sockets.h>
  31. #include <folly/portability/Unistd.h>
  32. #include <folly/synchronization/Baton.h>
  33. #include <folly/test/SocketAddressTestHelper.h>
  34. #include <fcntl.h>
  35. #include <sys/types.h>
  36. #include <iostream>
  37. #include <memory>
  38. #include <thread>
  39. using std::cerr;
  40. using std::endl;
  41. using std::min;
  42. using std::string;
  43. using std::unique_ptr;
  44. using std::vector;
  45. using std::chrono::milliseconds;
  46. using namespace folly;
  47. using namespace folly::test;
  48. using namespace testing;
  49. namespace fsp = folly::portability::sockets;
  50. class DelayedWrite : public AsyncTimeout {
  51. public:
  52. DelayedWrite(
  53. const std::shared_ptr<AsyncSocket>& socket,
  54. unique_ptr<IOBuf>&& bufs,
  55. AsyncTransportWrapper::WriteCallback* wcb,
  56. bool cork,
  57. bool lastWrite = false)
  58. : AsyncTimeout(socket->getEventBase()),
  59. socket_(socket),
  60. bufs_(std::move(bufs)),
  61. wcb_(wcb),
  62. cork_(cork),
  63. lastWrite_(lastWrite) {}
  64. private:
  65. void timeoutExpired() noexcept override {
  66. WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
  67. socket_->writeChain(wcb_, std::move(bufs_), flags);
  68. if (lastWrite_) {
  69. socket_->shutdownWrite();
  70. }
  71. }
  72. std::shared_ptr<AsyncSocket> socket_;
  73. unique_ptr<IOBuf> bufs_;
  74. AsyncTransportWrapper::WriteCallback* wcb_;
  75. bool cork_;
  76. bool lastWrite_;
  77. };
  78. ///////////////////////////////////////////////////////////////////////////
  79. // connect() tests
  80. ///////////////////////////////////////////////////////////////////////////
  81. /**
  82. * Test connecting to a server
  83. */
  84. TEST(AsyncSocketTest, Connect) {
  85. // Start listening on a local port
  86. TestServer server;
  87. // Connect using a AsyncSocket
  88. EventBase evb;
  89. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  90. ConnCallback cb;
  91. const auto startedAt = std::chrono::steady_clock::now();
  92. socket->connect(&cb, server.getAddress(), 30);
  93. evb.loop();
  94. const auto finishedAt = std::chrono::steady_clock::now();
  95. ASSERT_EQ(cb.state, STATE_SUCCEEDED);
  96. EXPECT_LE(0, socket->getConnectTime().count());
  97. EXPECT_GE(socket->getConnectStartTime(), startedAt);
  98. EXPECT_LE(socket->getConnectStartTime(), socket->getConnectEndTime());
  99. EXPECT_LE(socket->getConnectEndTime(), finishedAt);
  100. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
  101. }
  102. enum class TFOState {
  103. DISABLED,
  104. ENABLED,
  105. };
  106. class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
  107. std::vector<TFOState> getTestingValues() {
  108. std::vector<TFOState> vals;
  109. vals.emplace_back(TFOState::DISABLED);
  110. #if FOLLY_ALLOW_TFO
  111. vals.emplace_back(TFOState::ENABLED);
  112. #endif
  113. return vals;
  114. }
  115. INSTANTIATE_TEST_CASE_P(
  116. ConnectTests,
  117. AsyncSocketConnectTest,
  118. ::testing::ValuesIn(getTestingValues()));
  119. /**
  120. * Test connecting to a server that isn't listening
  121. */
  122. TEST(AsyncSocketTest, ConnectRefused) {
  123. EventBase evb;
  124. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  125. // Hopefully nothing is actually listening on this address
  126. folly::SocketAddress addr("127.0.0.1", 65535);
  127. ConnCallback cb;
  128. socket->connect(&cb, addr, 30);
  129. evb.loop();
  130. EXPECT_EQ(STATE_FAILED, cb.state);
  131. EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
  132. EXPECT_LE(0, socket->getConnectTime().count());
  133. EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
  134. }
  135. /**
  136. * Test connection timeout
  137. */
  138. TEST(AsyncSocketTest, ConnectTimeout) {
  139. EventBase evb;
  140. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  141. // Try connecting to server that won't respond.
  142. //
  143. // This depends somewhat on the network where this test is run.
  144. // Hopefully this IP will be routable but unresponsive.
  145. // (Alternatively, we could try listening on a local raw socket, but that
  146. // normally requires root privileges.)
  147. auto host = SocketAddressTestHelper::isIPv6Enabled()
  148. ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
  149. : SocketAddressTestHelper::isIPv4Enabled()
  150. ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
  151. : nullptr;
  152. SocketAddress addr(host, 65535);
  153. ConnCallback cb;
  154. socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
  155. evb.loop();
  156. ASSERT_EQ(cb.state, STATE_FAILED);
  157. if (cb.exception.getType() == AsyncSocketException::NOT_OPEN) {
  158. // This can happen if we could not route to the IP address picked above.
  159. // In this case the connect will fail immediately rather than timing out.
  160. // Just skip the test in this case.
  161. SKIP() << "do not have a routable but unreachable IP address";
  162. }
  163. ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
  164. // Verify that we can still get the peer address after a timeout.
  165. // Use case is if the client was created from a client pool, and we want
  166. // to log which peer failed.
  167. folly::SocketAddress peer;
  168. socket->getPeerAddress(&peer);
  169. ASSERT_EQ(peer, addr);
  170. EXPECT_LE(0, socket->getConnectTime().count());
  171. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
  172. }
  173. /**
  174. * Test writing immediately after connecting, without waiting for connect
  175. * to finish.
  176. */
  177. TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
  178. TestServer server;
  179. // connect()
  180. EventBase evb;
  181. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  182. if (GetParam() == TFOState::ENABLED) {
  183. socket->enableTFO();
  184. }
  185. ConnCallback ccb;
  186. socket->connect(&ccb, server.getAddress(), 30);
  187. // write()
  188. char buf[128];
  189. memset(buf, 'a', sizeof(buf));
  190. WriteCallback wcb;
  191. socket->write(&wcb, buf, sizeof(buf));
  192. // Loop. We don't bother accepting on the server socket yet.
  193. // The kernel should be able to buffer the write request so it can succeed.
  194. evb.loop();
  195. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  196. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  197. // Make sure the server got a connection and received the data
  198. socket->close();
  199. server.verifyConnection(buf, sizeof(buf));
  200. ASSERT_TRUE(socket->isClosedBySelf());
  201. ASSERT_FALSE(socket->isClosedByPeer());
  202. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
  203. }
  204. /**
  205. * Test connecting using a nullptr connect callback.
  206. */
  207. TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
  208. TestServer server;
  209. // connect()
  210. EventBase evb;
  211. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  212. if (GetParam() == TFOState::ENABLED) {
  213. socket->enableTFO();
  214. }
  215. socket->connect(nullptr, server.getAddress(), 30);
  216. // write some data, just so we have some way of verifing
  217. // that the socket works correctly after connecting
  218. char buf[128];
  219. memset(buf, 'a', sizeof(buf));
  220. WriteCallback wcb;
  221. socket->write(&wcb, buf, sizeof(buf));
  222. evb.loop();
  223. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  224. // Make sure the server got a connection and received the data
  225. socket->close();
  226. server.verifyConnection(buf, sizeof(buf));
  227. ASSERT_TRUE(socket->isClosedBySelf());
  228. ASSERT_FALSE(socket->isClosedByPeer());
  229. }
  230. /**
  231. * Test calling both write() and close() immediately after connecting, without
  232. * waiting for connect to finish.
  233. *
  234. * This exercises the STATE_CONNECTING_CLOSING code.
  235. */
  236. TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
  237. TestServer server;
  238. // connect()
  239. EventBase evb;
  240. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  241. if (GetParam() == TFOState::ENABLED) {
  242. socket->enableTFO();
  243. }
  244. ConnCallback ccb;
  245. socket->connect(&ccb, server.getAddress(), 30);
  246. // write()
  247. char buf[128];
  248. memset(buf, 'a', sizeof(buf));
  249. WriteCallback wcb;
  250. socket->write(&wcb, buf, sizeof(buf));
  251. // close()
  252. socket->close();
  253. // Loop. We don't bother accepting on the server socket yet.
  254. // The kernel should be able to buffer the write request so it can succeed.
  255. evb.loop();
  256. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  257. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  258. // Make sure the server got a connection and received the data
  259. server.verifyConnection(buf, sizeof(buf));
  260. ASSERT_TRUE(socket->isClosedBySelf());
  261. ASSERT_FALSE(socket->isClosedByPeer());
  262. }
  263. /**
  264. * Test calling close() immediately after connect()
  265. */
  266. TEST(AsyncSocketTest, ConnectAndClose) {
  267. TestServer server;
  268. // Connect using a AsyncSocket
  269. EventBase evb;
  270. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  271. ConnCallback ccb;
  272. socket->connect(&ccb, server.getAddress(), 30);
  273. // Hopefully the connect didn't succeed immediately.
  274. // If it did, we can't exercise the close-while-connecting code path.
  275. if (ccb.state == STATE_SUCCEEDED) {
  276. LOG(INFO) << "connect() succeeded immediately; aborting test "
  277. "of close-during-connect behavior";
  278. return;
  279. }
  280. socket->close();
  281. // Loop, although there shouldn't be anything to do.
  282. evb.loop();
  283. // Make sure the connection was aborted
  284. ASSERT_EQ(ccb.state, STATE_FAILED);
  285. ASSERT_TRUE(socket->isClosedBySelf());
  286. ASSERT_FALSE(socket->isClosedByPeer());
  287. }
  288. /**
  289. * Test calling closeNow() immediately after connect()
  290. *
  291. * This should be identical to the normal close behavior.
  292. */
  293. TEST(AsyncSocketTest, ConnectAndCloseNow) {
  294. TestServer server;
  295. // Connect using a AsyncSocket
  296. EventBase evb;
  297. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  298. ConnCallback ccb;
  299. socket->connect(&ccb, server.getAddress(), 30);
  300. // Hopefully the connect didn't succeed immediately.
  301. // If it did, we can't exercise the close-while-connecting code path.
  302. if (ccb.state == STATE_SUCCEEDED) {
  303. LOG(INFO) << "connect() succeeded immediately; aborting test "
  304. "of closeNow()-during-connect behavior";
  305. return;
  306. }
  307. socket->closeNow();
  308. // Loop, although there shouldn't be anything to do.
  309. evb.loop();
  310. // Make sure the connection was aborted
  311. ASSERT_EQ(ccb.state, STATE_FAILED);
  312. ASSERT_TRUE(socket->isClosedBySelf());
  313. ASSERT_FALSE(socket->isClosedByPeer());
  314. }
  315. /**
  316. * Test calling both write() and closeNow() immediately after connecting,
  317. * without waiting for connect to finish.
  318. *
  319. * This should abort the pending write.
  320. */
  321. TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
  322. TestServer server;
  323. // connect()
  324. EventBase evb;
  325. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  326. ConnCallback ccb;
  327. socket->connect(&ccb, server.getAddress(), 30);
  328. // Hopefully the connect didn't succeed immediately.
  329. // If it did, we can't exercise the close-while-connecting code path.
  330. if (ccb.state == STATE_SUCCEEDED) {
  331. LOG(INFO) << "connect() succeeded immediately; aborting test "
  332. "of write-during-connect behavior";
  333. return;
  334. }
  335. // write()
  336. char buf[128];
  337. memset(buf, 'a', sizeof(buf));
  338. WriteCallback wcb;
  339. socket->write(&wcb, buf, sizeof(buf));
  340. // close()
  341. socket->closeNow();
  342. // Loop, although there shouldn't be anything to do.
  343. evb.loop();
  344. ASSERT_EQ(ccb.state, STATE_FAILED);
  345. ASSERT_EQ(wcb.state, STATE_FAILED);
  346. ASSERT_TRUE(socket->isClosedBySelf());
  347. ASSERT_FALSE(socket->isClosedByPeer());
  348. }
  349. /**
  350. * Test installing a read callback immediately, before connect() finishes.
  351. */
  352. TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
  353. TestServer server;
  354. // connect()
  355. EventBase evb;
  356. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  357. if (GetParam() == TFOState::ENABLED) {
  358. socket->enableTFO();
  359. }
  360. ConnCallback ccb;
  361. socket->connect(&ccb, server.getAddress(), 30);
  362. ReadCallback rcb;
  363. socket->setReadCB(&rcb);
  364. if (GetParam() == TFOState::ENABLED) {
  365. // Trigger a connection
  366. socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
  367. }
  368. // Even though we haven't looped yet, we should be able to accept
  369. // the connection and send data to it.
  370. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  371. uint8_t buf[128];
  372. memset(buf, 'a', sizeof(buf));
  373. acceptedSocket->write(buf, sizeof(buf));
  374. acceptedSocket->flush();
  375. acceptedSocket->close();
  376. // Loop, although there shouldn't be anything to do.
  377. evb.loop();
  378. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  379. ASSERT_EQ(rcb.buffers.size(), 1);
  380. ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
  381. ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
  382. ASSERT_FALSE(socket->isClosedBySelf());
  383. ASSERT_FALSE(socket->isClosedByPeer());
  384. }
  385. /**
  386. * Test installing a read callback and then closing immediately before the
  387. * connect attempt finishes.
  388. */
  389. TEST(AsyncSocketTest, ConnectReadAndClose) {
  390. TestServer server;
  391. // connect()
  392. EventBase evb;
  393. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  394. ConnCallback ccb;
  395. socket->connect(&ccb, server.getAddress(), 30);
  396. // Hopefully the connect didn't succeed immediately.
  397. // If it did, we can't exercise the close-while-connecting code path.
  398. if (ccb.state == STATE_SUCCEEDED) {
  399. LOG(INFO) << "connect() succeeded immediately; aborting test "
  400. "of read-during-connect behavior";
  401. return;
  402. }
  403. ReadCallback rcb;
  404. socket->setReadCB(&rcb);
  405. // close()
  406. socket->close();
  407. // Loop, although there shouldn't be anything to do.
  408. evb.loop();
  409. ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
  410. ASSERT_EQ(rcb.buffers.size(), 0);
  411. ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
  412. ASSERT_TRUE(socket->isClosedBySelf());
  413. ASSERT_FALSE(socket->isClosedByPeer());
  414. }
  415. /**
  416. * Test both writing and installing a read callback immediately,
  417. * before connect() finishes.
  418. */
  419. TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
  420. TestServer server;
  421. // connect()
  422. EventBase evb;
  423. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  424. if (GetParam() == TFOState::ENABLED) {
  425. socket->enableTFO();
  426. }
  427. ConnCallback ccb;
  428. socket->connect(&ccb, server.getAddress(), 30);
  429. // write()
  430. char buf1[128];
  431. memset(buf1, 'a', sizeof(buf1));
  432. WriteCallback wcb;
  433. socket->write(&wcb, buf1, sizeof(buf1));
  434. // set a read callback
  435. ReadCallback rcb;
  436. socket->setReadCB(&rcb);
  437. // Even though we haven't looped yet, we should be able to accept
  438. // the connection and send data to it.
  439. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  440. uint8_t buf2[128];
  441. memset(buf2, 'b', sizeof(buf2));
  442. acceptedSocket->write(buf2, sizeof(buf2));
  443. acceptedSocket->flush();
  444. // shut down the write half of acceptedSocket, so that the AsyncSocket
  445. // will stop reading and we can break out of the event loop.
  446. shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
  447. // Loop
  448. evb.loop();
  449. // Make sure the connect succeeded
  450. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  451. // Make sure the AsyncSocket read the data written by the accepted socket
  452. ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
  453. ASSERT_EQ(rcb.buffers.size(), 1);
  454. ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
  455. ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
  456. // Close the AsyncSocket so we'll see EOF on acceptedSocket
  457. socket->close();
  458. // Make sure the accepted socket saw the data written by the AsyncSocket
  459. uint8_t readbuf[sizeof(buf1)];
  460. acceptedSocket->readAll(readbuf, sizeof(readbuf));
  461. ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
  462. uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
  463. ASSERT_EQ(bytesRead, 0);
  464. ASSERT_FALSE(socket->isClosedBySelf());
  465. ASSERT_TRUE(socket->isClosedByPeer());
  466. }
  467. /**
  468. * Test writing to the socket then shutting down writes before the connect
  469. * attempt finishes.
  470. */
  471. TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
  472. TestServer server;
  473. // connect()
  474. EventBase evb;
  475. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  476. ConnCallback ccb;
  477. socket->connect(&ccb, server.getAddress(), 30);
  478. // Hopefully the connect didn't succeed immediately.
  479. // If it did, we can't exercise the write-while-connecting code path.
  480. if (ccb.state == STATE_SUCCEEDED) {
  481. LOG(INFO) << "connect() succeeded immediately; skipping test";
  482. return;
  483. }
  484. // Ask to write some data
  485. char wbuf[128];
  486. memset(wbuf, 'a', sizeof(wbuf));
  487. WriteCallback wcb;
  488. socket->write(&wcb, wbuf, sizeof(wbuf));
  489. socket->shutdownWrite();
  490. // Shutdown writes
  491. socket->shutdownWrite();
  492. // Even though we haven't looped yet, we should be able to accept
  493. // the connection.
  494. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  495. // Since the connection is still in progress, there should be no data to
  496. // read yet. Verify that the accepted socket is not readable.
  497. struct pollfd fds[1];
  498. fds[0].fd = acceptedSocket->getSocketFD();
  499. fds[0].events = POLLIN;
  500. fds[0].revents = 0;
  501. int rc = poll(fds, 1, 0);
  502. ASSERT_EQ(rc, 0);
  503. // Write data to the accepted socket
  504. uint8_t acceptedWbuf[192];
  505. memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
  506. acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
  507. acceptedSocket->flush();
  508. // Loop
  509. evb.loop();
  510. // The loop should have completed the connection, written the queued data,
  511. // and shutdown writes on the socket.
  512. //
  513. // Check that the connection was completed successfully and that the write
  514. // callback succeeded.
  515. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  516. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  517. // Check that we can read the data that was written to the socket, and that
  518. // we see an EOF, since its socket was half-shutdown.
  519. uint8_t readbuf[sizeof(wbuf)];
  520. acceptedSocket->readAll(readbuf, sizeof(readbuf));
  521. ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
  522. uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
  523. ASSERT_EQ(bytesRead, 0);
  524. // Close the accepted socket. This will cause it to see EOF
  525. // and uninstall the read callback when we loop next.
  526. acceptedSocket->close();
  527. // Install a read callback, then loop again.
  528. ReadCallback rcb;
  529. socket->setReadCB(&rcb);
  530. evb.loop();
  531. // This loop should have read the data and seen the EOF
  532. ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
  533. ASSERT_EQ(rcb.buffers.size(), 1);
  534. ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
  535. ASSERT_EQ(
  536. memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
  537. ASSERT_FALSE(socket->isClosedBySelf());
  538. ASSERT_FALSE(socket->isClosedByPeer());
  539. }
  540. /**
  541. * Test reading, writing, and shutting down writes before the connect attempt
  542. * finishes.
  543. */
  544. TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
  545. TestServer server;
  546. // connect()
  547. EventBase evb;
  548. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  549. ConnCallback ccb;
  550. socket->connect(&ccb, server.getAddress(), 30);
  551. // Hopefully the connect didn't succeed immediately.
  552. // If it did, we can't exercise the write-while-connecting code path.
  553. if (ccb.state == STATE_SUCCEEDED) {
  554. LOG(INFO) << "connect() succeeded immediately; skipping test";
  555. return;
  556. }
  557. // Install a read callback
  558. ReadCallback rcb;
  559. socket->setReadCB(&rcb);
  560. // Ask to write some data
  561. char wbuf[128];
  562. memset(wbuf, 'a', sizeof(wbuf));
  563. WriteCallback wcb;
  564. socket->write(&wcb, wbuf, sizeof(wbuf));
  565. // Shutdown writes
  566. socket->shutdownWrite();
  567. // Even though we haven't looped yet, we should be able to accept
  568. // the connection.
  569. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  570. // Since the connection is still in progress, there should be no data to
  571. // read yet. Verify that the accepted socket is not readable.
  572. struct pollfd fds[1];
  573. fds[0].fd = acceptedSocket->getSocketFD();
  574. fds[0].events = POLLIN;
  575. fds[0].revents = 0;
  576. int rc = poll(fds, 1, 0);
  577. ASSERT_EQ(rc, 0);
  578. // Write data to the accepted socket
  579. uint8_t acceptedWbuf[192];
  580. memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
  581. acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
  582. acceptedSocket->flush();
  583. // Shutdown writes to the accepted socket. This will cause it to see EOF
  584. // and uninstall the read callback.
  585. shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
  586. // Loop
  587. evb.loop();
  588. // The loop should have completed the connection, written the queued data,
  589. // shutdown writes on the socket, read the data we wrote to it, and see the
  590. // EOF.
  591. //
  592. // Check that the connection was completed successfully and that the read
  593. // and write callbacks were invoked as expected.
  594. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  595. ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
  596. ASSERT_EQ(rcb.buffers.size(), 1);
  597. ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
  598. ASSERT_EQ(
  599. memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
  600. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  601. // Check that we can read the data that was written to the socket, and that
  602. // we see an EOF, since its socket was half-shutdown.
  603. uint8_t readbuf[sizeof(wbuf)];
  604. acceptedSocket->readAll(readbuf, sizeof(readbuf));
  605. ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
  606. uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
  607. ASSERT_EQ(bytesRead, 0);
  608. // Fully close both sockets
  609. acceptedSocket->close();
  610. socket->close();
  611. ASSERT_FALSE(socket->isClosedBySelf());
  612. ASSERT_TRUE(socket->isClosedByPeer());
  613. }
  614. /**
  615. * Test reading, writing, and calling shutdownWriteNow() before the
  616. * connect attempt finishes.
  617. */
  618. TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
  619. TestServer server;
  620. // connect()
  621. EventBase evb;
  622. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  623. ConnCallback ccb;
  624. socket->connect(&ccb, server.getAddress(), 30);
  625. // Hopefully the connect didn't succeed immediately.
  626. // If it did, we can't exercise the write-while-connecting code path.
  627. if (ccb.state == STATE_SUCCEEDED) {
  628. LOG(INFO) << "connect() succeeded immediately; skipping test";
  629. return;
  630. }
  631. // Install a read callback
  632. ReadCallback rcb;
  633. socket->setReadCB(&rcb);
  634. // Ask to write some data
  635. char wbuf[128];
  636. memset(wbuf, 'a', sizeof(wbuf));
  637. WriteCallback wcb;
  638. socket->write(&wcb, wbuf, sizeof(wbuf));
  639. // Shutdown writes immediately.
  640. // This should immediately discard the data that we just tried to write.
  641. socket->shutdownWriteNow();
  642. // Verify that writeError() was invoked on the write callback.
  643. ASSERT_EQ(wcb.state, STATE_FAILED);
  644. ASSERT_EQ(wcb.bytesWritten, 0);
  645. // Even though we haven't looped yet, we should be able to accept
  646. // the connection.
  647. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  648. // Since the connection is still in progress, there should be no data to
  649. // read yet. Verify that the accepted socket is not readable.
  650. struct pollfd fds[1];
  651. fds[0].fd = acceptedSocket->getSocketFD();
  652. fds[0].events = POLLIN;
  653. fds[0].revents = 0;
  654. int rc = poll(fds, 1, 0);
  655. ASSERT_EQ(rc, 0);
  656. // Write data to the accepted socket
  657. uint8_t acceptedWbuf[192];
  658. memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
  659. acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
  660. acceptedSocket->flush();
  661. // Shutdown writes to the accepted socket. This will cause it to see EOF
  662. // and uninstall the read callback.
  663. shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
  664. // Loop
  665. evb.loop();
  666. // The loop should have completed the connection, written the queued data,
  667. // shutdown writes on the socket, read the data we wrote to it, and see the
  668. // EOF.
  669. //
  670. // Check that the connection was completed successfully and that the read
  671. // callback was invoked as expected.
  672. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  673. ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
  674. ASSERT_EQ(rcb.buffers.size(), 1);
  675. ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
  676. ASSERT_EQ(
  677. memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0);
  678. // Since we used shutdownWriteNow(), it should have discarded all pending
  679. // write data. Verify we see an immediate EOF when reading from the accepted
  680. // socket.
  681. uint8_t readbuf[sizeof(wbuf)];
  682. uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
  683. ASSERT_EQ(bytesRead, 0);
  684. // Fully close both sockets
  685. acceptedSocket->close();
  686. socket->close();
  687. ASSERT_FALSE(socket->isClosedBySelf());
  688. ASSERT_TRUE(socket->isClosedByPeer());
  689. }
  690. // Helper function for use in testConnectOptWrite()
  691. // Temporarily disable the read callback
  692. void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
  693. // Uninstall the read callback
  694. socket->setReadCB(nullptr);
  695. // Schedule the read callback to be reinstalled after 1ms
  696. socket->getEventBase()->runInLoop(
  697. std::bind(&AsyncSocket::setReadCB, socket, rcb));
  698. }
  699. /**
  700. * Test connect+write, then have the connect callback perform another write.
  701. *
  702. * This tests interaction of the optimistic writing after connect with
  703. * additional write attempts that occur in the connect callback.
  704. */
  705. void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
  706. TestServer server;
  707. EventBase evb;
  708. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  709. // connect()
  710. ConnCallback ccb;
  711. socket->connect(&ccb, server.getAddress(), 30);
  712. // Hopefully the connect didn't succeed immediately.
  713. // If it did, we can't exercise the optimistic write code path.
  714. if (ccb.state == STATE_SUCCEEDED) {
  715. LOG(INFO) << "connect() succeeded immediately; aborting test "
  716. "of optimistic write behavior";
  717. return;
  718. }
  719. // Tell the connect callback to perform a write when the connect succeeds
  720. WriteCallback wcb2;
  721. std::unique_ptr<char[]> buf2(new char[size2]);
  722. memset(buf2.get(), 'b', size2);
  723. if (size2 > 0) {
  724. ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
  725. // Tell the second write callback to close the connection when it is done
  726. wcb2.successCallback = [&] { socket->closeNow(); };
  727. }
  728. // Schedule one write() immediately, before the connect finishes
  729. std::unique_ptr<char[]> buf1(new char[size1]);
  730. memset(buf1.get(), 'a', size1);
  731. WriteCallback wcb1;
  732. if (size1 > 0) {
  733. socket->write(&wcb1, buf1.get(), size1);
  734. }
  735. if (close) {
  736. // immediately perform a close, before connect() completes
  737. socket->close();
  738. }
  739. // Start reading from the other endpoint after 10ms.
  740. // If we're using large buffers, we have to read so that the writes don't
  741. // block forever.
  742. std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
  743. ReadCallback rcb;
  744. rcb.dataAvailableCallback =
  745. std::bind(tmpDisableReads, acceptedSocket.get(), &rcb);
  746. socket->getEventBase()->tryRunAfterDelay(
  747. std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb), 10);
  748. // Loop. We don't bother accepting on the server socket yet.
  749. // The kernel should be able to buffer the write request so it can succeed.
  750. evb.loop();
  751. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  752. if (size1 > 0) {
  753. ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
  754. }
  755. if (size2 > 0) {
  756. ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
  757. }
  758. socket->close();
  759. // Make sure the read callback received all of the data
  760. size_t bytesRead = 0;
  761. for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
  762. it != rcb.buffers.end();
  763. ++it) {
  764. size_t start = bytesRead;
  765. bytesRead += it->length;
  766. size_t end = bytesRead;
  767. if (start < size1) {
  768. size_t cmpLen = min(size1, end) - start;
  769. ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
  770. }
  771. if (end > size1 && end <= size1 + size2) {
  772. size_t itOffset;
  773. size_t buf2Offset;
  774. size_t cmpLen;
  775. if (start >= size1) {
  776. itOffset = 0;
  777. buf2Offset = start - size1;
  778. cmpLen = end - start;
  779. } else {
  780. itOffset = size1 - start;
  781. buf2Offset = 0;
  782. cmpLen = end - size1;
  783. }
  784. ASSERT_EQ(
  785. memcmp(it->buffer + itOffset, buf2.get() + buf2Offset, cmpLen), 0);
  786. }
  787. }
  788. ASSERT_EQ(bytesRead, size1 + size2);
  789. }
  790. TEST(AsyncSocketTest, ConnectCallbackWrite) {
  791. // Test using small writes that should both succeed immediately
  792. testConnectOptWrite(100, 200);
  793. // Test using a large buffer in the connect callback, that should block
  794. const size_t largeSize = 32 * 1024 * 1024;
  795. testConnectOptWrite(100, largeSize);
  796. // Test using a large initial write
  797. testConnectOptWrite(largeSize, 100);
  798. // Test using two large buffers
  799. testConnectOptWrite(largeSize, largeSize);
  800. // Test a small write in the connect callback,
  801. // but no immediate write before connect completes
  802. testConnectOptWrite(0, 64);
  803. // Test a large write in the connect callback,
  804. // but no immediate write before connect completes
  805. testConnectOptWrite(0, largeSize);
  806. // Test connect, a small write, then immediately call close() before connect
  807. // completes
  808. testConnectOptWrite(211, 0, true);
  809. // Test connect, a large immediate write (that will block), then immediately
  810. // call close() before connect completes
  811. testConnectOptWrite(largeSize, 0, true);
  812. }
  813. ///////////////////////////////////////////////////////////////////////////
  814. // write() related tests
  815. ///////////////////////////////////////////////////////////////////////////
  816. /**
  817. * Test writing using a nullptr callback
  818. */
  819. TEST(AsyncSocketTest, WriteNullCallback) {
  820. TestServer server;
  821. // connect()
  822. EventBase evb;
  823. std::shared_ptr<AsyncSocket> socket =
  824. AsyncSocket::newSocket(&evb, server.getAddress(), 30);
  825. evb.loop(); // loop until the socket is connected
  826. // write() with a nullptr callback
  827. char buf[128];
  828. memset(buf, 'a', sizeof(buf));
  829. socket->write(nullptr, buf, sizeof(buf));
  830. evb.loop(); // loop until the data is sent
  831. // Make sure the server got a connection and received the data
  832. socket->close();
  833. server.verifyConnection(buf, sizeof(buf));
  834. ASSERT_TRUE(socket->isClosedBySelf());
  835. ASSERT_FALSE(socket->isClosedByPeer());
  836. }
  837. /**
  838. * Test writing with a send timeout
  839. */
  840. TEST(AsyncSocketTest, WriteTimeout) {
  841. TestServer server;
  842. // connect()
  843. EventBase evb;
  844. std::shared_ptr<AsyncSocket> socket =
  845. AsyncSocket::newSocket(&evb, server.getAddress(), 30);
  846. evb.loop(); // loop until the socket is connected
  847. // write() a large chunk of data, with no-one on the other end reading.
  848. // Tricky: the kernel caches the connection metrics for recently-used
  849. // routes (see tcp_no_metrics_save) so a freshly opened connection can
  850. // have a send buffer size bigger than wmem_default. This makes the test
  851. // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
  852. size_t writeLength = 32 * 1024 * 1024;
  853. uint32_t timeout = 200;
  854. socket->setSendTimeout(timeout);
  855. std::unique_ptr<char[]> buf(new char[writeLength]);
  856. memset(buf.get(), 'a', writeLength);
  857. WriteCallback wcb;
  858. socket->write(&wcb, buf.get(), writeLength);
  859. TimePoint start;
  860. evb.loop();
  861. TimePoint end;
  862. // Make sure the write attempt timed out as requested
  863. ASSERT_EQ(wcb.state, STATE_FAILED);
  864. ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
  865. // Check that the write timed out within a reasonable period of time.
  866. // We don't check for exactly the specified timeout, since AsyncSocket only
  867. // times out when it hasn't made progress for that period of time.
  868. //
  869. // On linux, the first write sends a few hundred kb of data, then blocks for
  870. // writability, and then unblocks again after 40ms and is able to write
  871. // another smaller of data before blocking permanently. Therefore it doesn't
  872. // time out until 40ms + timeout.
  873. //
  874. // I haven't fully verified the cause of this, but I believe it probably
  875. // occurs because the receiving end delays sending an ack for up to 40ms.
  876. // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
  877. // the ack, it can send some more data. However, after that point the
  878. // receiver's kernel buffer is full. This 40ms delay happens even with
  879. // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
  880. // kernel may be automatically disabling TCP_QUICKACK after receiving some
  881. // data.
  882. //
  883. // For now, we simply check that the timeout occurred within 160ms of
  884. // the requested value.
  885. T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
  886. }
  887. /**
  888. * Test writing to a socket that the remote endpoint has closed
  889. */
  890. TEST(AsyncSocketTest, WritePipeError) {
  891. TestServer server;
  892. // connect()
  893. EventBase evb;
  894. std::shared_ptr<AsyncSocket> socket =
  895. AsyncSocket::newSocket(&evb, server.getAddress(), 30);
  896. socket->setSendTimeout(1000);
  897. evb.loop(); // loop until the socket is connected
  898. // accept and immediately close the socket
  899. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  900. acceptedSocket->close();
  901. // write() a large chunk of data
  902. size_t writeLength = 32 * 1024 * 1024;
  903. std::unique_ptr<char[]> buf(new char[writeLength]);
  904. memset(buf.get(), 'a', writeLength);
  905. WriteCallback wcb;
  906. socket->write(&wcb, buf.get(), writeLength);
  907. evb.loop();
  908. // Make sure the write failed.
  909. // It would be nice if AsyncSocketException could convey the errno value,
  910. // so that we could check for EPIPE
  911. ASSERT_EQ(wcb.state, STATE_FAILED);
  912. ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::INTERNAL_ERROR);
  913. ASSERT_FALSE(socket->isClosedBySelf());
  914. ASSERT_FALSE(socket->isClosedByPeer());
  915. }
  916. /**
  917. * Test writing to a socket that has its read side closed
  918. */
  919. TEST(AsyncSocketTest, WriteAfterReadEOF) {
  920. TestServer server;
  921. // connect()
  922. EventBase evb;
  923. std::shared_ptr<AsyncSocket> socket =
  924. AsyncSocket::newSocket(&evb, server.getAddress(), 30);
  925. evb.loop(); // loop until the socket is connected
  926. // Accept the connection
  927. std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
  928. ReadCallback rcb;
  929. acceptedSocket->setReadCB(&rcb);
  930. // Shutdown the write side of client socket (read side of server socket)
  931. socket->shutdownWrite();
  932. evb.loop();
  933. // Check that accepted socket is still writable
  934. ASSERT_FALSE(acceptedSocket->good());
  935. ASSERT_TRUE(acceptedSocket->writable());
  936. // Write data to accepted socket
  937. constexpr size_t simpleBufLength = 5;
  938. char simpleBuf[simpleBufLength];
  939. memset(simpleBuf, 'a', simpleBufLength);
  940. WriteCallback wcb;
  941. acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
  942. evb.loop();
  943. // Make sure we were able to write even after getting a read EOF
  944. ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
  945. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  946. }
  947. /**
  948. * Test that bytes written is correctly computed in case of write failure
  949. */
  950. TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
  951. // Send and receive buffer sizes for the sockets.
  952. // Note that Linux will double this value to allow space for bookkeeping
  953. // overhead.
  954. constexpr size_t kSockBufSize = 8 * 1024;
  955. constexpr size_t kEffectiveSockBufSize = 2 * kSockBufSize;
  956. TestServer server(false, kSockBufSize);
  957. AsyncSocket::OptionMap options{
  958. {{SOL_SOCKET, SO_SNDBUF}, kSockBufSize},
  959. {{SOL_SOCKET, SO_RCVBUF}, kSockBufSize},
  960. {{IPPROTO_TCP, TCP_NODELAY}, 1},
  961. };
  962. // The current thread will be used by the receiver - use a separate thread
  963. // for the sender.
  964. EventBase senderEvb;
  965. std::thread senderThread([&]() { senderEvb.loopForever(); });
  966. ConnCallback ccb;
  967. std::shared_ptr<AsyncSocket> socket;
  968. senderEvb.runInEventBaseThreadAndWait([&]() {
  969. socket = AsyncSocket::newSocket(&senderEvb);
  970. socket->connect(&ccb, server.getAddress(), 30, options);
  971. });
  972. // accept the socket on the server side
  973. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  974. // Send a big (100KB) write so that it is partially written.
  975. constexpr size_t kSendSize = 100 * 1024;
  976. auto const sendBuf = std::vector<char>(kSendSize, 'a');
  977. WriteCallback wcb;
  978. senderEvb.runInEventBaseThreadAndWait(
  979. [&]() { socket->write(&wcb, sendBuf.data(), kSendSize); });
  980. // Read 20KB of data from the socket to allow the sender to send a bit more
  981. // data after it initially blocks.
  982. constexpr size_t kRecvSize = 20 * 1024;
  983. uint8_t recvBuf[kRecvSize];
  984. auto bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
  985. ASSERT_EQ(kRecvSize, bytesRead);
  986. EXPECT_EQ(0, memcmp(recvBuf, sendBuf.data(), bytesRead));
  987. // We should be able to send at least the amount of data received plus the
  988. // send buffer size. In practice we should probably be able to send
  989. constexpr size_t kMinExpectedBytesWritten = kRecvSize + kSockBufSize;
  990. // We shouldn't be able to send more than the amount of data received plus
  991. // the send buffer size of the sending socket (kEffectiveSockBufSize) plus
  992. // the receive buffer size on the receiving socket (kEffectiveSockBufSize)
  993. constexpr size_t kMaxExpectedBytesWritten =
  994. kRecvSize + kEffectiveSockBufSize + kEffectiveSockBufSize;
  995. static_assert(
  996. kMaxExpectedBytesWritten < kSendSize, "kSendSize set too small");
  997. // Need to delay after receiving 20KB and before closing the receive side so
  998. // that the send side has a chance to fill the send buffer past.
  999. using clock = std::chrono::steady_clock;
  1000. auto const deadline = clock::now() + std::chrono::seconds(2);
  1001. while (wcb.bytesWritten < kMinExpectedBytesWritten &&
  1002. clock::now() < deadline) {
  1003. std::this_thread::yield();
  1004. }
  1005. acceptedSocket->closeWithReset();
  1006. senderEvb.terminateLoopSoon();
  1007. senderThread.join();
  1008. ASSERT_EQ(STATE_FAILED, wcb.state);
  1009. ASSERT_LE(kMinExpectedBytesWritten, wcb.bytesWritten);
  1010. ASSERT_GE(kMaxExpectedBytesWritten, wcb.bytesWritten);
  1011. }
  1012. /**
  1013. * Test writing a mix of simple buffers and IOBufs
  1014. */
  1015. TEST(AsyncSocketTest, WriteIOBuf) {
  1016. TestServer server;
  1017. // connect()
  1018. EventBase evb;
  1019. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  1020. ConnCallback ccb;
  1021. socket->connect(&ccb, server.getAddress(), 30);
  1022. // Accept the connection
  1023. std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
  1024. ReadCallback rcb;
  1025. acceptedSocket->setReadCB(&rcb);
  1026. // Check if EOR tracking flag can be set and reset.
  1027. EXPECT_FALSE(socket->isEorTrackingEnabled());
  1028. socket->setEorTracking(true);
  1029. EXPECT_TRUE(socket->isEorTrackingEnabled());
  1030. socket->setEorTracking(false);
  1031. EXPECT_FALSE(socket->isEorTrackingEnabled());
  1032. // Write a simple buffer to the socket
  1033. constexpr size_t simpleBufLength = 5;
  1034. char simpleBuf[simpleBufLength];
  1035. memset(simpleBuf, 'a', simpleBufLength);
  1036. WriteCallback wcb;
  1037. socket->write(&wcb, simpleBuf, simpleBufLength);
  1038. // Write a single-element IOBuf chain
  1039. size_t buf1Length = 7;
  1040. unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
  1041. memset(buf1->writableData(), 'b', buf1Length);
  1042. buf1->append(buf1Length);
  1043. unique_ptr<IOBuf> buf1Copy(buf1->clone());
  1044. WriteCallback wcb2;
  1045. socket->writeChain(&wcb2, std::move(buf1));
  1046. // Write a multiple-element IOBuf chain
  1047. size_t buf2Length = 11;
  1048. unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
  1049. memset(buf2->writableData(), 'c', buf2Length);
  1050. buf2->append(buf2Length);
  1051. size_t buf3Length = 13;
  1052. unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
  1053. memset(buf3->writableData(), 'd', buf3Length);
  1054. buf3->append(buf3Length);
  1055. buf2->appendChain(std::move(buf3));
  1056. unique_ptr<IOBuf> buf2Copy(buf2->clone());
  1057. buf2Copy->coalesce();
  1058. WriteCallback wcb3;
  1059. socket->writeChain(&wcb3, std::move(buf2));
  1060. socket->shutdownWrite();
  1061. // Let the reads and writes run to completion
  1062. evb.loop();
  1063. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  1064. ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
  1065. ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
  1066. // Make sure the reader got the right data in the right order
  1067. ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
  1068. ASSERT_EQ(rcb.buffers.size(), 1);
  1069. ASSERT_EQ(
  1070. rcb.buffers[0].length,
  1071. simpleBufLength + buf1Length + buf2Length + buf3Length);
  1072. ASSERT_EQ(memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
  1073. ASSERT_EQ(
  1074. memcmp(
  1075. rcb.buffers[0].buffer + simpleBufLength,
  1076. buf1Copy->data(),
  1077. buf1Copy->length()),
  1078. 0);
  1079. ASSERT_EQ(
  1080. memcmp(
  1081. rcb.buffers[0].buffer + simpleBufLength + buf1Length,
  1082. buf2Copy->data(),
  1083. buf2Copy->length()),
  1084. 0);
  1085. acceptedSocket->close();
  1086. socket->close();
  1087. ASSERT_TRUE(socket->isClosedBySelf());
  1088. ASSERT_FALSE(socket->isClosedByPeer());
  1089. }
  1090. TEST(AsyncSocketTest, WriteIOBufCorked) {
  1091. TestServer server;
  1092. // connect()
  1093. EventBase evb;
  1094. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  1095. ConnCallback ccb;
  1096. socket->connect(&ccb, server.getAddress(), 30);
  1097. // Accept the connection
  1098. std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
  1099. ReadCallback rcb;
  1100. acceptedSocket->setReadCB(&rcb);
  1101. // Do three writes, 100ms apart, with the "cork" flag set
  1102. // on the second write. The reader should see the first write
  1103. // arrive by itself, followed by the second and third writes
  1104. // arriving together.
  1105. size_t buf1Length = 5;
  1106. unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
  1107. memset(buf1->writableData(), 'a', buf1Length);
  1108. buf1->append(buf1Length);
  1109. size_t buf2Length = 7;
  1110. unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
  1111. memset(buf2->writableData(), 'b', buf2Length);
  1112. buf2->append(buf2Length);
  1113. size_t buf3Length = 11;
  1114. unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
  1115. memset(buf3->writableData(), 'c', buf3Length);
  1116. buf3->append(buf3Length);
  1117. WriteCallback wcb1;
  1118. socket->writeChain(&wcb1, std::move(buf1));
  1119. WriteCallback wcb2;
  1120. DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
  1121. write2.scheduleTimeout(100);
  1122. WriteCallback wcb3;
  1123. DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
  1124. write3.scheduleTimeout(140);
  1125. evb.loop();
  1126. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  1127. ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
  1128. ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
  1129. if (wcb3.state != STATE_SUCCEEDED) {
  1130. throw(wcb3.exception);
  1131. }
  1132. ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
  1133. // Make sure the reader got the data with the right grouping
  1134. ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
  1135. ASSERT_EQ(rcb.buffers.size(), 2);
  1136. ASSERT_EQ(rcb.buffers[0].length, buf1Length);
  1137. ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
  1138. acceptedSocket->close();
  1139. socket->close();
  1140. ASSERT_TRUE(socket->isClosedBySelf());
  1141. ASSERT_FALSE(socket->isClosedByPeer());
  1142. }
  1143. /**
  1144. * Test performing a zero-length write
  1145. */
  1146. TEST(AsyncSocketTest, ZeroLengthWrite) {
  1147. TestServer server;
  1148. // connect()
  1149. EventBase evb;
  1150. std::shared_ptr<AsyncSocket> socket =
  1151. AsyncSocket::newSocket(&evb, server.getAddress(), 30);
  1152. evb.loop(); // loop until the socket is connected
  1153. auto acceptedSocket = server.acceptAsync(&evb);
  1154. ReadCallback rcb;
  1155. acceptedSocket->setReadCB(&rcb);
  1156. size_t len1 = 1024 * 1024;
  1157. size_t len2 = 1024 * 1024;
  1158. std::unique_ptr<char[]> buf(new char[len1 + len2]);
  1159. memset(buf.get(), 'a', len1);
  1160. memset(buf.get(), 'b', len2);
  1161. WriteCallback wcb1;
  1162. WriteCallback wcb2;
  1163. WriteCallback wcb3;
  1164. WriteCallback wcb4;
  1165. socket->write(&wcb1, buf.get(), 0);
  1166. socket->write(&wcb2, buf.get(), len1);
  1167. socket->write(&wcb3, buf.get() + len1, 0);
  1168. socket->write(&wcb4, buf.get() + len1, len2);
  1169. socket->close();
  1170. evb.loop(); // loop until the data is sent
  1171. ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
  1172. ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
  1173. ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
  1174. ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
  1175. rcb.verifyData(buf.get(), len1 + len2);
  1176. ASSERT_TRUE(socket->isClosedBySelf());
  1177. ASSERT_FALSE(socket->isClosedByPeer());
  1178. }
  1179. TEST(AsyncSocketTest, ZeroLengthWritev) {
  1180. TestServer server;
  1181. // connect()
  1182. EventBase evb;
  1183. std::shared_ptr<AsyncSocket> socket =
  1184. AsyncSocket::newSocket(&evb, server.getAddress(), 30);
  1185. evb.loop(); // loop until the socket is connected
  1186. auto acceptedSocket = server.acceptAsync(&evb);
  1187. ReadCallback rcb;
  1188. acceptedSocket->setReadCB(&rcb);
  1189. size_t len1 = 1024 * 1024;
  1190. size_t len2 = 1024 * 1024;
  1191. std::unique_ptr<char[]> buf(new char[len1 + len2]);
  1192. memset(buf.get(), 'a', len1);
  1193. memset(buf.get(), 'b', len2);
  1194. WriteCallback wcb;
  1195. constexpr size_t iovCount = 4;
  1196. struct iovec iov[iovCount];
  1197. iov[0].iov_base = buf.get();
  1198. iov[0].iov_len = len1;
  1199. iov[1].iov_base = buf.get() + len1;
  1200. iov[1].iov_len = 0;
  1201. iov[2].iov_base = buf.get() + len1;
  1202. iov[2].iov_len = len2;
  1203. iov[3].iov_base = buf.get() + len1 + len2;
  1204. iov[3].iov_len = 0;
  1205. socket->writev(&wcb, iov, iovCount);
  1206. socket->close();
  1207. evb.loop(); // loop until the data is sent
  1208. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  1209. rcb.verifyData(buf.get(), len1 + len2);
  1210. ASSERT_TRUE(socket->isClosedBySelf());
  1211. ASSERT_FALSE(socket->isClosedByPeer());
  1212. }
  1213. ///////////////////////////////////////////////////////////////////////////
  1214. // close() related tests
  1215. ///////////////////////////////////////////////////////////////////////////
  1216. /**
  1217. * Test calling close() with pending writes when the socket is already closing.
  1218. */
  1219. TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
  1220. TestServer server;
  1221. // connect()
  1222. EventBase evb;
  1223. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  1224. ConnCallback ccb;
  1225. socket->connect(&ccb, server.getAddress(), 30);
  1226. // accept the socket on the server side
  1227. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  1228. // Loop to ensure the connect has completed
  1229. evb.loop();
  1230. // Make sure we are connected
  1231. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  1232. // Schedule pending writes, until several write attempts have blocked
  1233. char buf[128];
  1234. memset(buf, 'a', sizeof(buf));
  1235. typedef vector<std::shared_ptr<WriteCallback>> WriteCallbackVector;
  1236. WriteCallbackVector writeCallbacks;
  1237. writeCallbacks.reserve(5);
  1238. while (writeCallbacks.size() < 5) {
  1239. std::shared_ptr<WriteCallback> wcb(new WriteCallback);
  1240. socket->write(wcb.get(), buf, sizeof(buf));
  1241. if (wcb->state == STATE_SUCCEEDED) {
  1242. // Succeeded immediately. Keep performing more writes
  1243. continue;
  1244. }
  1245. // This write is blocked.
  1246. // Have the write callback call close() when writeError() is invoked
  1247. wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
  1248. writeCallbacks.push_back(wcb);
  1249. }
  1250. // Call closeNow() to immediately fail the pending writes
  1251. socket->closeNow();
  1252. // Make sure writeError() was invoked on all of the pending write callbacks
  1253. for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
  1254. it != writeCallbacks.end();
  1255. ++it) {
  1256. ASSERT_EQ((*it)->state, STATE_FAILED);
  1257. }
  1258. ASSERT_TRUE(socket->isClosedBySelf());
  1259. ASSERT_FALSE(socket->isClosedByPeer());
  1260. }
  1261. ///////////////////////////////////////////////////////////////////////////
  1262. // ImmediateRead related tests
  1263. ///////////////////////////////////////////////////////////////////////////
  1264. /* AsyncSocket use to verify immediate read works */
  1265. class AsyncSocketImmediateRead : public folly::AsyncSocket {
  1266. public:
  1267. bool immediateReadCalled = false;
  1268. explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
  1269. protected:
  1270. void checkForImmediateRead() noexcept override {
  1271. immediateReadCalled = true;
  1272. AsyncSocket::handleRead();
  1273. }
  1274. };
  1275. TEST(AsyncSocket, ConnectReadImmediateRead) {
  1276. TestServer server;
  1277. const size_t maxBufferSz = 100;
  1278. const size_t maxReadsPerEvent = 1;
  1279. const size_t expectedDataSz = maxBufferSz * 3;
  1280. char expectedData[expectedDataSz];
  1281. memset(expectedData, 'j', expectedDataSz);
  1282. EventBase evb;
  1283. ReadCallback rcb(maxBufferSz);
  1284. AsyncSocketImmediateRead socket(&evb);
  1285. socket.connect(nullptr, server.getAddress(), 30);
  1286. evb.loop(); // loop until the socket is connected
  1287. socket.setReadCB(&rcb);
  1288. socket.setMaxReadsPerEvent(maxReadsPerEvent);
  1289. socket.immediateReadCalled = false;
  1290. auto acceptedSocket = server.acceptAsync(&evb);
  1291. ReadCallback rcbServer;
  1292. WriteCallback wcbServer;
  1293. rcbServer.dataAvailableCallback = [&]() {
  1294. if (rcbServer.dataRead() == expectedDataSz) {
  1295. // write back all data read
  1296. rcbServer.verifyData(expectedData, expectedDataSz);
  1297. acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
  1298. acceptedSocket->close();
  1299. }
  1300. };
  1301. acceptedSocket->setReadCB(&rcbServer);
  1302. // write data
  1303. WriteCallback wcb1;
  1304. socket.write(&wcb1, expectedData, expectedDataSz);
  1305. evb.loop();
  1306. ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
  1307. rcb.verifyData(expectedData, expectedDataSz);
  1308. ASSERT_EQ(socket.immediateReadCalled, true);
  1309. ASSERT_FALSE(socket.isClosedBySelf());
  1310. ASSERT_FALSE(socket.isClosedByPeer());
  1311. }
  1312. TEST(AsyncSocket, ConnectReadUninstallRead) {
  1313. TestServer server;
  1314. const size_t maxBufferSz = 100;
  1315. const size_t maxReadsPerEvent = 1;
  1316. const size_t expectedDataSz = maxBufferSz * 3;
  1317. char expectedData[expectedDataSz];
  1318. memset(expectedData, 'k', expectedDataSz);
  1319. EventBase evb;
  1320. ReadCallback rcb(maxBufferSz);
  1321. AsyncSocketImmediateRead socket(&evb);
  1322. socket.connect(nullptr, server.getAddress(), 30);
  1323. evb.loop(); // loop until the socket is connected
  1324. socket.setReadCB(&rcb);
  1325. socket.setMaxReadsPerEvent(maxReadsPerEvent);
  1326. socket.immediateReadCalled = false;
  1327. auto acceptedSocket = server.acceptAsync(&evb);
  1328. ReadCallback rcbServer;
  1329. WriteCallback wcbServer;
  1330. rcbServer.dataAvailableCallback = [&]() {
  1331. if (rcbServer.dataRead() == expectedDataSz) {
  1332. // write back all data read
  1333. rcbServer.verifyData(expectedData, expectedDataSz);
  1334. acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
  1335. acceptedSocket->close();
  1336. }
  1337. };
  1338. acceptedSocket->setReadCB(&rcbServer);
  1339. rcb.dataAvailableCallback = [&]() {
  1340. // we read data and reset readCB
  1341. socket.setReadCB(nullptr);
  1342. };
  1343. // write data
  1344. WriteCallback wcb;
  1345. socket.write(&wcb, expectedData, expectedDataSz);
  1346. evb.loop();
  1347. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  1348. /* we shoud've only read maxBufferSz data since readCallback_
  1349. * was reset in dataAvailableCallback */
  1350. ASSERT_EQ(rcb.dataRead(), maxBufferSz);
  1351. ASSERT_EQ(socket.immediateReadCalled, false);
  1352. ASSERT_FALSE(socket.isClosedBySelf());
  1353. ASSERT_FALSE(socket.isClosedByPeer());
  1354. }
  1355. // TODO:
  1356. // - Test connect() and have the connect callback set the read callback
  1357. // - Test connect() and have the connect callback unset the read callback
  1358. // - Test reading/writing/closing/destroying the socket in the connect callback
  1359. // - Test reading/writing/closing/destroying the socket in the read callback
  1360. // - Test reading/writing/closing/destroying the socket in the write callback
  1361. // - Test one-way shutdown behavior
  1362. // - Test changing the EventBase
  1363. //
  1364. // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
  1365. // in connectSuccess(), readDataAvailable(), writeSuccess()
  1366. ///////////////////////////////////////////////////////////////////////////
  1367. // AsyncServerSocket tests
  1368. ///////////////////////////////////////////////////////////////////////////
  1369. /**
  1370. * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
  1371. */
  1372. TEST(AsyncSocketTest, ServerAcceptOptions) {
  1373. EventBase eventBase;
  1374. // Create a server socket
  1375. std::shared_ptr<AsyncServerSocket> serverSocket(
  1376. AsyncServerSocket::newSocket(&eventBase));
  1377. serverSocket->bind(0);
  1378. serverSocket->listen(16);
  1379. folly::SocketAddress serverAddress;
  1380. serverSocket->getAddress(&serverAddress);
  1381. // Add a callback to accept one connection then stop the loop
  1382. TestAcceptCallback acceptCallback;
  1383. acceptCallback.setConnectionAcceptedFn(
  1384. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1385. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  1386. });
  1387. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1388. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  1389. });
  1390. serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
  1391. serverSocket->startAccepting();
  1392. // Connect to the server socket
  1393. std::shared_ptr<AsyncSocket> socket(
  1394. AsyncSocket::newSocket(&eventBase, serverAddress));
  1395. eventBase.loop();
  1396. // Verify that the server accepted a connection
  1397. ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
  1398. ASSERT_EQ(
  1399. acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1400. ASSERT_EQ(
  1401. acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1402. ASSERT_EQ(
  1403. acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
  1404. int fd = acceptCallback.getEvents()->at(1).fd;
  1405. // The accepted connection should already be in non-blocking mode
  1406. int flags = fcntl(fd, F_GETFL, 0);
  1407. ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
  1408. #ifndef TCP_NOPUSH
  1409. // The accepted connection should already have TCP_NODELAY set
  1410. int value;
  1411. socklen_t valueLength = sizeof(value);
  1412. int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
  1413. ASSERT_EQ(rc, 0);
  1414. ASSERT_EQ(value, 1);
  1415. #endif
  1416. }
  1417. /**
  1418. * Test AsyncServerSocket::removeAcceptCallback()
  1419. */
  1420. TEST(AsyncSocketTest, RemoveAcceptCallback) {
  1421. // Create a new AsyncServerSocket
  1422. EventBase eventBase;
  1423. std::shared_ptr<AsyncServerSocket> serverSocket(
  1424. AsyncServerSocket::newSocket(&eventBase));
  1425. serverSocket->bind(0);
  1426. serverSocket->listen(16);
  1427. folly::SocketAddress serverAddress;
  1428. serverSocket->getAddress(&serverAddress);
  1429. // Add several accept callbacks
  1430. TestAcceptCallback cb1;
  1431. TestAcceptCallback cb2;
  1432. TestAcceptCallback cb3;
  1433. TestAcceptCallback cb4;
  1434. TestAcceptCallback cb5;
  1435. TestAcceptCallback cb6;
  1436. TestAcceptCallback cb7;
  1437. // Test having callbacks remove other callbacks before them on the list,
  1438. // after them on the list, or removing themselves.
  1439. //
  1440. // Have callback 2 remove callback 3 and callback 5 the first time it is
  1441. // called.
  1442. int cb2Count = 0;
  1443. cb1.setConnectionAcceptedFn(
  1444. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1445. std::shared_ptr<AsyncSocket> sock2(AsyncSocket::newSocket(
  1446. &eventBase, serverAddress)); // cb2: -cb3 -cb5
  1447. });
  1448. cb3.setConnectionAcceptedFn(
  1449. [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
  1450. cb4.setConnectionAcceptedFn(
  1451. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1452. std::shared_ptr<AsyncSocket> sock3(
  1453. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
  1454. });
  1455. cb5.setConnectionAcceptedFn(
  1456. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1457. std::shared_ptr<AsyncSocket> sock5(
  1458. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
  1459. });
  1460. cb2.setConnectionAcceptedFn(
  1461. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1462. if (cb2Count == 0) {
  1463. serverSocket->removeAcceptCallback(&cb3, nullptr);
  1464. serverSocket->removeAcceptCallback(&cb5, nullptr);
  1465. }
  1466. ++cb2Count;
  1467. });
  1468. // Have callback 6 remove callback 4 the first time it is called,
  1469. // and destroy the server socket the second time it is called
  1470. int cb6Count = 0;
  1471. cb6.setConnectionAcceptedFn(
  1472. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1473. if (cb6Count == 0) {
  1474. serverSocket->removeAcceptCallback(&cb4, nullptr);
  1475. std::shared_ptr<AsyncSocket> sock6(
  1476. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
  1477. std::shared_ptr<AsyncSocket> sock7(
  1478. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
  1479. std::shared_ptr<AsyncSocket> sock8(
  1480. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
  1481. } else {
  1482. serverSocket.reset();
  1483. }
  1484. ++cb6Count;
  1485. });
  1486. // Have callback 7 remove itself
  1487. cb7.setConnectionAcceptedFn(
  1488. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1489. serverSocket->removeAcceptCallback(&cb7, nullptr);
  1490. });
  1491. serverSocket->addAcceptCallback(&cb1, &eventBase);
  1492. serverSocket->addAcceptCallback(&cb2, &eventBase);
  1493. serverSocket->addAcceptCallback(&cb3, &eventBase);
  1494. serverSocket->addAcceptCallback(&cb4, &eventBase);
  1495. serverSocket->addAcceptCallback(&cb5, &eventBase);
  1496. serverSocket->addAcceptCallback(&cb6, &eventBase);
  1497. serverSocket->addAcceptCallback(&cb7, &eventBase);
  1498. serverSocket->startAccepting();
  1499. // Make several connections to the socket
  1500. std::shared_ptr<AsyncSocket> sock1(
  1501. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
  1502. std::shared_ptr<AsyncSocket> sock4(
  1503. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
  1504. // Loop until we are stopped
  1505. eventBase.loop();
  1506. // Check to make sure that the expected callbacks were invoked.
  1507. //
  1508. // NOTE: This code depends on the AsyncServerSocket operating calling all of
  1509. // the AcceptCallbacks in round-robin fashion, in the order that they were
  1510. // added. The code is implemented this way right now, but the API doesn't
  1511. // explicitly require it be done this way. If we change the code not to be
  1512. // exactly round robin in the future, we can simplify the test checks here.
  1513. // (We'll also need to update the termination code, since we expect cb6 to
  1514. // get called twice to terminate the loop.)
  1515. ASSERT_EQ(cb1.getEvents()->size(), 4);
  1516. ASSERT_EQ(cb1.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1517. ASSERT_EQ(cb1.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1518. ASSERT_EQ(cb1.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT);
  1519. ASSERT_EQ(cb1.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP);
  1520. ASSERT_EQ(cb2.getEvents()->size(), 4);
  1521. ASSERT_EQ(cb2.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1522. ASSERT_EQ(cb2.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1523. ASSERT_EQ(cb2.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT);
  1524. ASSERT_EQ(cb2.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP);
  1525. ASSERT_EQ(cb3.getEvents()->size(), 2);
  1526. ASSERT_EQ(cb3.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1527. ASSERT_EQ(cb3.getEvents()->at(1).type, TestAcceptCallback::TYPE_STOP);
  1528. ASSERT_EQ(cb4.getEvents()->size(), 3);
  1529. ASSERT_EQ(cb4.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1530. ASSERT_EQ(cb4.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1531. ASSERT_EQ(cb4.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
  1532. ASSERT_EQ(cb5.getEvents()->size(), 2);
  1533. ASSERT_EQ(cb5.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1534. ASSERT_EQ(cb5.getEvents()->at(1).type, TestAcceptCallback::TYPE_STOP);
  1535. ASSERT_EQ(cb6.getEvents()->size(), 4);
  1536. ASSERT_EQ(cb6.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1537. ASSERT_EQ(cb6.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1538. ASSERT_EQ(cb6.getEvents()->at(2).type, TestAcceptCallback::TYPE_ACCEPT);
  1539. ASSERT_EQ(cb6.getEvents()->at(3).type, TestAcceptCallback::TYPE_STOP);
  1540. ASSERT_EQ(cb7.getEvents()->size(), 3);
  1541. ASSERT_EQ(cb7.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1542. ASSERT_EQ(cb7.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1543. ASSERT_EQ(cb7.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
  1544. }
  1545. /**
  1546. * Test AsyncServerSocket::removeAcceptCallback()
  1547. */
  1548. TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
  1549. // Create a new AsyncServerSocket
  1550. EventBase eventBase;
  1551. std::shared_ptr<AsyncServerSocket> serverSocket(
  1552. AsyncServerSocket::newSocket(&eventBase));
  1553. serverSocket->bind(0);
  1554. serverSocket->listen(16);
  1555. folly::SocketAddress serverAddress;
  1556. serverSocket->getAddress(&serverAddress);
  1557. // Add several accept callbacks
  1558. TestAcceptCallback cb1;
  1559. auto thread_id = std::this_thread::get_id();
  1560. cb1.setAcceptStartedFn([&]() {
  1561. CHECK_NE(thread_id, std::this_thread::get_id());
  1562. thread_id = std::this_thread::get_id();
  1563. });
  1564. cb1.setConnectionAcceptedFn(
  1565. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1566. ASSERT_EQ(thread_id, std::this_thread::get_id());
  1567. serverSocket->removeAcceptCallback(&cb1, &eventBase);
  1568. });
  1569. cb1.setAcceptStoppedFn(
  1570. [&]() { ASSERT_EQ(thread_id, std::this_thread::get_id()); });
  1571. // Test having callbacks remove other callbacks before them on the list,
  1572. serverSocket->addAcceptCallback(&cb1, &eventBase);
  1573. serverSocket->startAccepting();
  1574. // Make several connections to the socket
  1575. std::shared_ptr<AsyncSocket> sock1(
  1576. AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
  1577. // Loop in another thread
  1578. auto other = std::thread([&]() { eventBase.loop(); });
  1579. other.join();
  1580. // Check to make sure that the expected callbacks were invoked.
  1581. //
  1582. // NOTE: This code depends on the AsyncServerSocket operating calling all of
  1583. // the AcceptCallbacks in round-robin fashion, in the order that they were
  1584. // added. The code is implemented this way right now, but the API doesn't
  1585. // explicitly require it be done this way. If we change the code not to be
  1586. // exactly round robin in the future, we can simplify the test checks here.
  1587. // (We'll also need to update the termination code, since we expect cb6 to
  1588. // get called twice to terminate the loop.)
  1589. ASSERT_EQ(cb1.getEvents()->size(), 3);
  1590. ASSERT_EQ(cb1.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1591. ASSERT_EQ(cb1.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1592. ASSERT_EQ(cb1.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
  1593. }
  1594. void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
  1595. EventBase* eventBase = serverSocket->getEventBase();
  1596. CHECK(eventBase);
  1597. // Add a callback to accept one connection then stop accepting
  1598. TestAcceptCallback acceptCallback;
  1599. acceptCallback.setConnectionAcceptedFn(
  1600. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1601. serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
  1602. });
  1603. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1604. serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
  1605. });
  1606. serverSocket->addAcceptCallback(&acceptCallback, eventBase);
  1607. serverSocket->startAccepting();
  1608. // Connect to the server socket
  1609. folly::SocketAddress serverAddress;
  1610. serverSocket->getAddress(&serverAddress);
  1611. AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
  1612. // Loop to process all events
  1613. eventBase->loop();
  1614. // Verify that the server accepted a connection
  1615. ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
  1616. ASSERT_EQ(
  1617. acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1618. ASSERT_EQ(
  1619. acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1620. ASSERT_EQ(
  1621. acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
  1622. }
  1623. /* Verify that we don't leak sockets if we are destroyed()
  1624. * and there are still writes pending
  1625. *
  1626. * If destroy() only calls close() instead of closeNow(),
  1627. * it would shutdown(writes) on the socket, but it would
  1628. * never be close()'d, and the socket would leak
  1629. */
  1630. TEST(AsyncSocketTest, DestroyCloseTest) {
  1631. TestServer server;
  1632. // connect()
  1633. EventBase clientEB;
  1634. EventBase serverEB;
  1635. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
  1636. ConnCallback ccb;
  1637. socket->connect(&ccb, server.getAddress(), 30);
  1638. // Accept the connection
  1639. std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
  1640. ReadCallback rcb;
  1641. acceptedSocket->setReadCB(&rcb);
  1642. // Write a large buffer to the socket that is larger than kernel buffer
  1643. size_t simpleBufLength = 5000000;
  1644. char* simpleBuf = new char[simpleBufLength];
  1645. memset(simpleBuf, 'a', simpleBufLength);
  1646. WriteCallback wcb;
  1647. // Let the reads and writes run to completion
  1648. int fd = acceptedSocket->getFd();
  1649. acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
  1650. socket.reset();
  1651. acceptedSocket.reset();
  1652. // Test that server socket was closed
  1653. folly::test::msvcSuppressAbortOnInvalidParams([&] {
  1654. ssize_t sz = read(fd, simpleBuf, simpleBufLength);
  1655. ASSERT_EQ(sz, -1);
  1656. ASSERT_EQ(errno, EBADF);
  1657. });
  1658. delete[] simpleBuf;
  1659. }
  1660. /**
  1661. * Test AsyncServerSocket::useExistingSocket()
  1662. */
  1663. TEST(AsyncSocketTest, ServerExistingSocket) {
  1664. EventBase eventBase;
  1665. // Test creating a socket, and letting AsyncServerSocket bind and listen
  1666. {
  1667. // Manually create a socket
  1668. int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  1669. ASSERT_GE(fd, 0);
  1670. // Create a server socket
  1671. AsyncServerSocket::UniquePtr serverSocket(
  1672. new AsyncServerSocket(&eventBase));
  1673. serverSocket->useExistingSocket(fd);
  1674. folly::SocketAddress address;
  1675. serverSocket->getAddress(&address);
  1676. address.setPort(0);
  1677. serverSocket->bind(address);
  1678. serverSocket->listen(16);
  1679. // Make sure the socket works
  1680. serverSocketSanityTest(serverSocket.get());
  1681. }
  1682. // Test creating a socket and binding manually,
  1683. // then letting AsyncServerSocket listen
  1684. {
  1685. // Manually create a socket
  1686. int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  1687. ASSERT_GE(fd, 0);
  1688. // bind
  1689. struct sockaddr_in addr;
  1690. addr.sin_family = AF_INET;
  1691. addr.sin_port = 0;
  1692. addr.sin_addr.s_addr = INADDR_ANY;
  1693. ASSERT_EQ(
  1694. bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)), 0);
  1695. // Look up the address that we bound to
  1696. folly::SocketAddress boundAddress;
  1697. boundAddress.setFromLocalAddress(fd);
  1698. // Create a server socket
  1699. AsyncServerSocket::UniquePtr serverSocket(
  1700. new AsyncServerSocket(&eventBase));
  1701. serverSocket->useExistingSocket(fd);
  1702. serverSocket->listen(16);
  1703. // Make sure AsyncServerSocket reports the same address that we bound to
  1704. folly::SocketAddress serverSocketAddress;
  1705. serverSocket->getAddress(&serverSocketAddress);
  1706. ASSERT_EQ(boundAddress, serverSocketAddress);
  1707. // Make sure the socket works
  1708. serverSocketSanityTest(serverSocket.get());
  1709. }
  1710. // Test creating a socket, binding and listening manually,
  1711. // then giving it to AsyncServerSocket
  1712. {
  1713. // Manually create a socket
  1714. int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  1715. ASSERT_GE(fd, 0);
  1716. // bind
  1717. struct sockaddr_in addr;
  1718. addr.sin_family = AF_INET;
  1719. addr.sin_port = 0;
  1720. addr.sin_addr.s_addr = INADDR_ANY;
  1721. ASSERT_EQ(
  1722. bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)), 0);
  1723. // Look up the address that we bound to
  1724. folly::SocketAddress boundAddress;
  1725. boundAddress.setFromLocalAddress(fd);
  1726. // listen
  1727. ASSERT_EQ(listen(fd, 16), 0);
  1728. // Create a server socket
  1729. AsyncServerSocket::UniquePtr serverSocket(
  1730. new AsyncServerSocket(&eventBase));
  1731. serverSocket->useExistingSocket(fd);
  1732. // Make sure AsyncServerSocket reports the same address that we bound to
  1733. folly::SocketAddress serverSocketAddress;
  1734. serverSocket->getAddress(&serverSocketAddress);
  1735. ASSERT_EQ(boundAddress, serverSocketAddress);
  1736. // Make sure the socket works
  1737. serverSocketSanityTest(serverSocket.get());
  1738. }
  1739. }
  1740. TEST(AsyncSocketTest, UnixDomainSocketTest) {
  1741. EventBase eventBase;
  1742. // Create a server socket
  1743. std::shared_ptr<AsyncServerSocket> serverSocket(
  1744. AsyncServerSocket::newSocket(&eventBase));
  1745. string path(1, 0);
  1746. path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
  1747. folly::SocketAddress serverAddress;
  1748. serverAddress.setFromPath(path);
  1749. serverSocket->bind(serverAddress);
  1750. serverSocket->listen(16);
  1751. // Add a callback to accept one connection then stop the loop
  1752. TestAcceptCallback acceptCallback;
  1753. acceptCallback.setConnectionAcceptedFn(
  1754. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1755. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  1756. });
  1757. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1758. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  1759. });
  1760. serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
  1761. serverSocket->startAccepting();
  1762. // Connect to the server socket
  1763. std::shared_ptr<AsyncSocket> socket(
  1764. AsyncSocket::newSocket(&eventBase, serverAddress));
  1765. eventBase.loop();
  1766. // Verify that the server accepted a connection
  1767. ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
  1768. ASSERT_EQ(
  1769. acceptCallback.getEvents()->at(0).type, TestAcceptCallback::TYPE_START);
  1770. ASSERT_EQ(
  1771. acceptCallback.getEvents()->at(1).type, TestAcceptCallback::TYPE_ACCEPT);
  1772. ASSERT_EQ(
  1773. acceptCallback.getEvents()->at(2).type, TestAcceptCallback::TYPE_STOP);
  1774. int fd = acceptCallback.getEvents()->at(1).fd;
  1775. // The accepted connection should already be in non-blocking mode
  1776. int flags = fcntl(fd, F_GETFL, 0);
  1777. ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
  1778. }
  1779. TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
  1780. EventBase eventBase;
  1781. TestConnectionEventCallback connectionEventCallback;
  1782. // Create a server socket
  1783. std::shared_ptr<AsyncServerSocket> serverSocket(
  1784. AsyncServerSocket::newSocket(&eventBase));
  1785. serverSocket->setConnectionEventCallback(&connectionEventCallback);
  1786. serverSocket->bind(0);
  1787. serverSocket->listen(16);
  1788. folly::SocketAddress serverAddress;
  1789. serverSocket->getAddress(&serverAddress);
  1790. // Add a callback to accept one connection then stop the loop
  1791. TestAcceptCallback acceptCallback;
  1792. acceptCallback.setConnectionAcceptedFn(
  1793. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1794. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1795. });
  1796. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1797. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1798. });
  1799. serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
  1800. serverSocket->startAccepting();
  1801. // Connect to the server socket
  1802. std::shared_ptr<AsyncSocket> socket(
  1803. AsyncSocket::newSocket(&eventBase, serverAddress));
  1804. eventBase.loop();
  1805. // Validate the connection event counters
  1806. ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
  1807. ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
  1808. ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
  1809. ASSERT_EQ(
  1810. connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
  1811. ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
  1812. ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
  1813. ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
  1814. ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
  1815. }
  1816. TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
  1817. EventBase eventBase;
  1818. TestConnectionEventCallback connectionEventCallback;
  1819. // Create a server socket
  1820. std::shared_ptr<AsyncServerSocket> serverSocket(
  1821. AsyncServerSocket::newSocket(&eventBase));
  1822. serverSocket->setConnectionEventCallback(&connectionEventCallback);
  1823. serverSocket->bind(0);
  1824. serverSocket->listen(16);
  1825. folly::SocketAddress serverAddress;
  1826. serverSocket->getAddress(&serverAddress);
  1827. // Add a callback to accept one connection then stop the loop
  1828. TestAcceptCallback acceptCallback;
  1829. acceptCallback.setConnectionAcceptedFn(
  1830. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1831. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1832. });
  1833. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1834. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1835. });
  1836. bool acceptStartedFlag{false};
  1837. acceptCallback.setAcceptStartedFn(
  1838. [&acceptStartedFlag]() { acceptStartedFlag = true; });
  1839. bool acceptStoppedFlag{false};
  1840. acceptCallback.setAcceptStoppedFn(
  1841. [&acceptStoppedFlag]() { acceptStoppedFlag = true; });
  1842. serverSocket->addAcceptCallback(&acceptCallback, nullptr);
  1843. serverSocket->startAccepting();
  1844. // Connect to the server socket
  1845. std::shared_ptr<AsyncSocket> socket(
  1846. AsyncSocket::newSocket(&eventBase, serverAddress));
  1847. eventBase.loop();
  1848. ASSERT_TRUE(acceptStartedFlag);
  1849. ASSERT_TRUE(acceptStoppedFlag);
  1850. // Validate the connection event counters
  1851. ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
  1852. ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
  1853. ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
  1854. ASSERT_EQ(
  1855. connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
  1856. ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
  1857. ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
  1858. ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
  1859. ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
  1860. }
  1861. TEST(AsyncSocketTest, CallbackInSecondaryEventBase) {
  1862. EventBase eventBase;
  1863. TestConnectionEventCallback connectionEventCallback;
  1864. // Create a server socket
  1865. std::shared_ptr<AsyncServerSocket> serverSocket(
  1866. AsyncServerSocket::newSocket(&eventBase));
  1867. serverSocket->setConnectionEventCallback(&connectionEventCallback);
  1868. serverSocket->bind(0);
  1869. serverSocket->listen(16);
  1870. SocketAddress serverAddress;
  1871. serverSocket->getAddress(&serverAddress);
  1872. // Add a callback to accept one connection then stop the loop
  1873. TestAcceptCallback acceptCallback;
  1874. ScopedEventBaseThread cobThread("ioworker_test");
  1875. acceptCallback.setConnectionAcceptedFn(
  1876. [&](int /* fd */, const SocketAddress& /* addr */) {
  1877. eventBase.runInEventBaseThread([&] {
  1878. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1879. });
  1880. });
  1881. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1882. eventBase.runInEventBaseThread(
  1883. [&] { serverSocket->removeAcceptCallback(&acceptCallback, nullptr); });
  1884. });
  1885. std::atomic<bool> acceptStartedFlag{false};
  1886. acceptCallback.setAcceptStartedFn([&]() { acceptStartedFlag = true; });
  1887. Baton<> acceptStoppedFlag;
  1888. acceptCallback.setAcceptStoppedFn([&]() { acceptStoppedFlag.post(); });
  1889. serverSocket->addAcceptCallback(&acceptCallback, cobThread.getEventBase());
  1890. serverSocket->startAccepting();
  1891. // Connect to the server socket
  1892. std::shared_ptr<AsyncSocket> socket(
  1893. AsyncSocket::newSocket(&eventBase, serverAddress));
  1894. eventBase.loop();
  1895. ASSERT_TRUE(acceptStoppedFlag.try_wait_for(std::chrono::seconds(1)));
  1896. ASSERT_TRUE(acceptStartedFlag);
  1897. // Validate the connection event counters
  1898. ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
  1899. ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
  1900. ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
  1901. ASSERT_EQ(
  1902. connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
  1903. ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
  1904. ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
  1905. ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
  1906. ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
  1907. }
  1908. /**
  1909. * Test AsyncServerSocket::getNumPendingMessagesInQueue()
  1910. */
  1911. TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
  1912. EventBase eventBase;
  1913. // Counter of how many connections have been accepted
  1914. int count = 0;
  1915. // Create a server socket
  1916. auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
  1917. serverSocket->bind(0);
  1918. serverSocket->listen(16);
  1919. folly::SocketAddress serverAddress;
  1920. serverSocket->getAddress(&serverAddress);
  1921. // Add a callback to accept connections
  1922. folly::ScopedEventBaseThread cobThread("ioworker_test");
  1923. TestAcceptCallback acceptCallback;
  1924. acceptCallback.setConnectionAcceptedFn(
  1925. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1926. count++;
  1927. eventBase.runInEventBaseThreadAndWait([&] {
  1928. ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
  1929. });
  1930. if (count == 4) {
  1931. eventBase.runInEventBaseThread([&] {
  1932. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1933. });
  1934. }
  1935. });
  1936. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1937. eventBase.runInEventBaseThread(
  1938. [&] { serverSocket->removeAcceptCallback(&acceptCallback, nullptr); });
  1939. });
  1940. serverSocket->addAcceptCallback(&acceptCallback, cobThread.getEventBase());
  1941. serverSocket->startAccepting();
  1942. // Connect to the server socket, 4 clients, there are 4 connections
  1943. auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
  1944. auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
  1945. auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
  1946. auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
  1947. eventBase.loop();
  1948. ASSERT_EQ(4, count);
  1949. }
  1950. TEST(AsyncSocketTest, ConnectionsStorm) {
  1951. enum class AcceptCobLocation {
  1952. Default,
  1953. Primary,
  1954. Secondary,
  1955. };
  1956. auto testFunc = [](AcceptCobLocation mode) {
  1957. EventBase eventBase;
  1958. // Counter of how many connections have been accepted
  1959. std::atomic<size_t> count{0};
  1960. // Create a server socket
  1961. auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
  1962. serverSocket->bind(0);
  1963. serverSocket->listen(100);
  1964. folly::SocketAddress serverAddress;
  1965. serverSocket->getAddress(&serverAddress);
  1966. TestConnectionEventCallback connectionEventCallback;
  1967. serverSocket->setConnectionEventCallback(&connectionEventCallback);
  1968. // Add a callback to accept connections
  1969. std::shared_ptr<ScopedEventBaseThread> thread;
  1970. TestAcceptCallback acceptCallback;
  1971. bool stopAccepting = false;
  1972. const size_t maxSockets = 2000;
  1973. acceptCallback.setConnectionAcceptedFn(
  1974. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  1975. count++;
  1976. if (!stopAccepting &&
  1977. (count == maxSockets ||
  1978. connectionEventCallback.getConnectionDropped() > 0)) {
  1979. stopAccepting = true;
  1980. eventBase.runInEventBaseThread([&] {
  1981. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1982. });
  1983. }
  1984. });
  1985. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  1986. eventBase.runInEventBaseThread([&] {
  1987. stopAccepting = true;
  1988. serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
  1989. });
  1990. });
  1991. if (mode == AcceptCobLocation::Default) {
  1992. serverSocket->addAcceptCallback(&acceptCallback, nullptr);
  1993. } else if (mode == AcceptCobLocation::Primary) {
  1994. serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
  1995. } else if (mode == AcceptCobLocation::Secondary) {
  1996. thread = std::make_shared<ScopedEventBaseThread>();
  1997. serverSocket->addAcceptCallback(&acceptCallback, thread->getEventBase());
  1998. }
  1999. serverSocket->startAccepting();
  2000. // Create connection storm to create connections fast but
  2001. // also pace it to not overflow servers' listening queue.
  2002. vector<std::shared_ptr<AsyncSocket>> sockets;
  2003. folly::Function<void()> fnOpenSockets = [&]() {
  2004. // Counter of connections pending the invocation of accept callback.
  2005. auto pending = serverSocket->getNumPendingMessagesInQueue();
  2006. while (sockets.size() < std::min(maxSockets, pending + count + 30)) {
  2007. auto socket = folly::AsyncSocket::newSocket(&eventBase);
  2008. socket->connect(nullptr, serverAddress, 5000);
  2009. sockets.push_back(socket);
  2010. }
  2011. if (sockets.size() < maxSockets && !stopAccepting) {
  2012. eventBase.runInEventBaseThread([&] { fnOpenSockets(); });
  2013. }
  2014. };
  2015. eventBase.runInEventBaseThread([&] { fnOpenSockets(); });
  2016. eventBase.loop();
  2017. ASSERT_EQ(maxSockets, count);
  2018. };
  2019. testFunc(AcceptCobLocation::Default);
  2020. testFunc(AcceptCobLocation::Primary);
  2021. testFunc(AcceptCobLocation::Secondary);
  2022. }
  2023. /**
  2024. * Test AsyncTransport::BufferCallback
  2025. */
  2026. TEST(AsyncSocketTest, BufferTest) {
  2027. TestServer server;
  2028. EventBase evb;
  2029. AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
  2030. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2031. ConnCallback ccb;
  2032. socket->connect(&ccb, server.getAddress(), 30, option);
  2033. char buf[100 * 1024];
  2034. memset(buf, 'c', sizeof(buf));
  2035. WriteCallback wcb;
  2036. BufferCallback bcb;
  2037. socket->setBufferCallback(&bcb);
  2038. socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
  2039. evb.loop();
  2040. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2041. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  2042. ASSERT_TRUE(bcb.hasBuffered());
  2043. ASSERT_TRUE(bcb.hasBufferCleared());
  2044. socket->close();
  2045. server.verifyConnection(buf, sizeof(buf));
  2046. ASSERT_TRUE(socket->isClosedBySelf());
  2047. ASSERT_FALSE(socket->isClosedByPeer());
  2048. }
  2049. TEST(AsyncSocketTest, BufferCallbackKill) {
  2050. TestServer server;
  2051. EventBase evb;
  2052. AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
  2053. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2054. ConnCallback ccb;
  2055. socket->connect(&ccb, server.getAddress(), 30, option);
  2056. evb.loopOnce();
  2057. char buf[100 * 1024];
  2058. memset(buf, 'c', sizeof(buf));
  2059. BufferCallback bcb;
  2060. socket->setBufferCallback(&bcb);
  2061. WriteCallback wcb;
  2062. wcb.successCallback = [&] {
  2063. ASSERT_TRUE(socket.unique());
  2064. socket.reset();
  2065. };
  2066. // This will trigger AsyncSocket::handleWrite,
  2067. // which calls WriteCallback::writeSuccess,
  2068. // which calls wcb.successCallback above,
  2069. // which tries to delete socket
  2070. // Then, the socket will also try to use this BufferCallback
  2071. // And that should crash us, if there is no DestructorGuard on the stack
  2072. socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
  2073. evb.loop();
  2074. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2075. }
  2076. #if FOLLY_ALLOW_TFO
  2077. TEST(AsyncSocketTest, ConnectTFO) {
  2078. // Start listening on a local port
  2079. TestServer server(true);
  2080. // Connect using a AsyncSocket
  2081. EventBase evb;
  2082. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2083. socket->enableTFO();
  2084. ConnCallback cb;
  2085. socket->connect(&cb, server.getAddress(), 30);
  2086. std::array<uint8_t, 128> buf;
  2087. memset(buf.data(), 'a', buf.size());
  2088. std::array<uint8_t, 3> readBuf;
  2089. auto sendBuf = IOBuf::copyBuffer("hey");
  2090. std::thread t([&] {
  2091. auto acceptedSocket = server.accept();
  2092. acceptedSocket->write(buf.data(), buf.size());
  2093. acceptedSocket->flush();
  2094. acceptedSocket->readAll(readBuf.data(), readBuf.size());
  2095. acceptedSocket->close();
  2096. });
  2097. evb.loop();
  2098. ASSERT_EQ(cb.state, STATE_SUCCEEDED);
  2099. EXPECT_LE(0, socket->getConnectTime().count());
  2100. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
  2101. EXPECT_TRUE(socket->getTFOAttempted());
  2102. // Should trigger the connect
  2103. WriteCallback write;
  2104. ReadCallback rcb;
  2105. socket->writeChain(&write, sendBuf->clone());
  2106. socket->setReadCB(&rcb);
  2107. evb.loop();
  2108. t.join();
  2109. EXPECT_EQ(STATE_SUCCEEDED, write.state);
  2110. EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
  2111. EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
  2112. ASSERT_EQ(1, rcb.buffers.size());
  2113. ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
  2114. EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
  2115. EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
  2116. }
  2117. TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
  2118. // Start listening on a local port
  2119. TestServer server(true);
  2120. // Connect using a AsyncSocket
  2121. EventBase evb;
  2122. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2123. socket->enableTFO();
  2124. ConnCallback cb;
  2125. socket->connect(&cb, server.getAddress(), 30);
  2126. ReadCallback rcb;
  2127. socket->setReadCB(&rcb);
  2128. std::array<uint8_t, 128> buf;
  2129. memset(buf.data(), 'a', buf.size());
  2130. std::array<uint8_t, 3> readBuf;
  2131. auto sendBuf = IOBuf::copyBuffer("hey");
  2132. std::thread t([&] {
  2133. auto acceptedSocket = server.accept();
  2134. acceptedSocket->write(buf.data(), buf.size());
  2135. acceptedSocket->flush();
  2136. acceptedSocket->readAll(readBuf.data(), readBuf.size());
  2137. acceptedSocket->close();
  2138. });
  2139. evb.loop();
  2140. ASSERT_EQ(cb.state, STATE_SUCCEEDED);
  2141. EXPECT_LE(0, socket->getConnectTime().count());
  2142. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
  2143. EXPECT_TRUE(socket->getTFOAttempted());
  2144. // Should trigger the connect
  2145. WriteCallback write;
  2146. socket->writeChain(&write, sendBuf->clone());
  2147. evb.loop();
  2148. t.join();
  2149. EXPECT_EQ(STATE_SUCCEEDED, write.state);
  2150. EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
  2151. EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
  2152. ASSERT_EQ(1, rcb.buffers.size());
  2153. ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
  2154. EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
  2155. EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
  2156. }
  2157. /**
  2158. * Test connecting to a server that isn't listening
  2159. */
  2160. TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
  2161. EventBase evb;
  2162. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2163. socket->enableTFO();
  2164. // Hopefully nothing is actually listening on this address
  2165. folly::SocketAddress addr("::1", 65535);
  2166. ConnCallback cb;
  2167. socket->connect(&cb, addr, 30);
  2168. evb.loop();
  2169. WriteCallback write1;
  2170. // Trigger the connect if TFO attempt is supported.
  2171. socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
  2172. WriteCallback write2;
  2173. socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
  2174. evb.loop();
  2175. if (!socket->getTFOFinished()) {
  2176. EXPECT_EQ(STATE_FAILED, write1.state);
  2177. } else {
  2178. EXPECT_EQ(STATE_SUCCEEDED, write1.state);
  2179. EXPECT_FALSE(socket->getTFOSucceded());
  2180. }
  2181. EXPECT_EQ(STATE_FAILED, write2.state);
  2182. EXPECT_EQ(STATE_SUCCEEDED, cb.state);
  2183. EXPECT_LE(0, socket->getConnectTime().count());
  2184. EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
  2185. EXPECT_TRUE(socket->getTFOAttempted());
  2186. }
  2187. /**
  2188. * Test calling closeNow() immediately after connecting.
  2189. */
  2190. TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
  2191. TestServer server(true);
  2192. // connect()
  2193. EventBase evb;
  2194. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2195. socket->enableTFO();
  2196. ConnCallback ccb;
  2197. socket->connect(&ccb, server.getAddress(), 30);
  2198. // write()
  2199. std::array<char, 128> buf;
  2200. memset(buf.data(), 'a', buf.size());
  2201. // close()
  2202. socket->closeNow();
  2203. // Loop, although there shouldn't be anything to do.
  2204. evb.loop();
  2205. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2206. ASSERT_TRUE(socket->isClosedBySelf());
  2207. ASSERT_FALSE(socket->isClosedByPeer());
  2208. }
  2209. /**
  2210. * Test calling close() immediately after connect()
  2211. */
  2212. TEST(AsyncSocketTest, ConnectAndCloseTFO) {
  2213. TestServer server(true);
  2214. // Connect using a AsyncSocket
  2215. EventBase evb;
  2216. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2217. socket->enableTFO();
  2218. ConnCallback ccb;
  2219. socket->connect(&ccb, server.getAddress(), 30);
  2220. socket->close();
  2221. // Loop, although there shouldn't be anything to do.
  2222. evb.loop();
  2223. // Make sure the connection was aborted
  2224. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2225. ASSERT_TRUE(socket->isClosedBySelf());
  2226. ASSERT_FALSE(socket->isClosedByPeer());
  2227. }
  2228. class MockAsyncTFOSocket : public AsyncSocket {
  2229. public:
  2230. using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
  2231. explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
  2232. MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
  2233. };
  2234. TEST(AsyncSocketTest, TestTFOUnsupported) {
  2235. TestServer server(true);
  2236. // Connect using a AsyncSocket
  2237. EventBase evb;
  2238. auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
  2239. socket->enableTFO();
  2240. ConnCallback ccb;
  2241. socket->connect(&ccb, server.getAddress(), 30);
  2242. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2243. ReadCallback rcb;
  2244. socket->setReadCB(&rcb);
  2245. EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
  2246. .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
  2247. WriteCallback write;
  2248. auto sendBuf = IOBuf::copyBuffer("hey");
  2249. socket->writeChain(&write, sendBuf->clone());
  2250. EXPECT_EQ(STATE_WAITING, write.state);
  2251. std::array<uint8_t, 128> buf;
  2252. memset(buf.data(), 'a', buf.size());
  2253. std::array<uint8_t, 3> readBuf;
  2254. std::thread t([&] {
  2255. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  2256. acceptedSocket->write(buf.data(), buf.size());
  2257. acceptedSocket->flush();
  2258. acceptedSocket->readAll(readBuf.data(), readBuf.size());
  2259. acceptedSocket->close();
  2260. });
  2261. evb.loop();
  2262. t.join();
  2263. EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
  2264. EXPECT_EQ(STATE_SUCCEEDED, write.state);
  2265. EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
  2266. EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
  2267. ASSERT_EQ(1, rcb.buffers.size());
  2268. ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
  2269. EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
  2270. EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
  2271. }
  2272. TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
  2273. EventBase evb;
  2274. auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
  2275. socket->enableTFO();
  2276. // Hopefully this fails
  2277. folly::SocketAddress fakeAddr("127.0.0.1", 65535);
  2278. EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
  2279. .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
  2280. sockaddr_storage addr;
  2281. auto len = fakeAddr.getAddress(&addr);
  2282. int ret = connect(fd, (const struct sockaddr*)&addr, len);
  2283. LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
  2284. << errno;
  2285. return ret;
  2286. }));
  2287. // Hopefully nothing is actually listening on this address
  2288. ConnCallback cb;
  2289. socket->connect(&cb, fakeAddr, 30);
  2290. WriteCallback write1;
  2291. // Trigger the connect if TFO attempt is supported.
  2292. socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
  2293. if (socket->getTFOFinished()) {
  2294. // This test is useless now.
  2295. return;
  2296. }
  2297. WriteCallback write2;
  2298. // Trigger the connect if TFO attempt is supported.
  2299. socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
  2300. evb.loop();
  2301. EXPECT_EQ(STATE_FAILED, write1.state);
  2302. EXPECT_EQ(STATE_FAILED, write2.state);
  2303. EXPECT_FALSE(socket->getTFOSucceded());
  2304. EXPECT_EQ(STATE_SUCCEEDED, cb.state);
  2305. EXPECT_LE(0, socket->getConnectTime().count());
  2306. EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
  2307. EXPECT_TRUE(socket->getTFOAttempted());
  2308. }
  2309. TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
  2310. // Try connecting to server that won't respond.
  2311. //
  2312. // This depends somewhat on the network where this test is run.
  2313. // Hopefully this IP will be routable but unresponsive.
  2314. // (Alternatively, we could try listening on a local raw socket, but that
  2315. // normally requires root privileges.)
  2316. auto host = SocketAddressTestHelper::isIPv6Enabled()
  2317. ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
  2318. : SocketAddressTestHelper::isIPv4Enabled()
  2319. ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
  2320. : nullptr;
  2321. SocketAddress addr(host, 65535);
  2322. // Connect using a AsyncSocket
  2323. EventBase evb;
  2324. auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
  2325. socket->enableTFO();
  2326. ConnCallback ccb;
  2327. // Set a very small timeout
  2328. socket->connect(&ccb, addr, 1);
  2329. EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
  2330. ReadCallback rcb;
  2331. socket->setReadCB(&rcb);
  2332. EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
  2333. .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
  2334. WriteCallback write;
  2335. socket->writeChain(&write, IOBuf::copyBuffer("hey"));
  2336. evb.loop();
  2337. EXPECT_EQ(STATE_FAILED, write.state);
  2338. }
  2339. TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
  2340. TestServer server(true);
  2341. // Connect using a AsyncSocket
  2342. EventBase evb;
  2343. auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
  2344. socket->enableTFO();
  2345. ConnCallback ccb;
  2346. socket->connect(&ccb, server.getAddress(), 30);
  2347. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2348. ReadCallback rcb;
  2349. socket->setReadCB(&rcb);
  2350. EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
  2351. .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
  2352. sockaddr_storage addr;
  2353. auto len = server.getAddress().getAddress(&addr);
  2354. return connect(fd, (const struct sockaddr*)&addr, len);
  2355. }));
  2356. WriteCallback write;
  2357. auto sendBuf = IOBuf::copyBuffer("hey");
  2358. socket->writeChain(&write, sendBuf->clone());
  2359. EXPECT_EQ(STATE_WAITING, write.state);
  2360. std::array<uint8_t, 128> buf;
  2361. memset(buf.data(), 'a', buf.size());
  2362. std::array<uint8_t, 3> readBuf;
  2363. std::thread t([&] {
  2364. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  2365. acceptedSocket->write(buf.data(), buf.size());
  2366. acceptedSocket->flush();
  2367. acceptedSocket->readAll(readBuf.data(), readBuf.size());
  2368. acceptedSocket->close();
  2369. });
  2370. evb.loop();
  2371. t.join();
  2372. EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
  2373. EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
  2374. EXPECT_EQ(STATE_SUCCEEDED, write.state);
  2375. EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
  2376. ASSERT_EQ(1, rcb.buffers.size());
  2377. ASSERT_EQ(buf.size(), rcb.buffers[0].length);
  2378. EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
  2379. }
  2380. TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
  2381. // Try connecting to server that won't respond.
  2382. //
  2383. // This depends somewhat on the network where this test is run.
  2384. // Hopefully this IP will be routable but unresponsive.
  2385. // (Alternatively, we could try listening on a local raw socket, but that
  2386. // normally requires root privileges.)
  2387. auto host = SocketAddressTestHelper::isIPv6Enabled()
  2388. ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
  2389. : SocketAddressTestHelper::isIPv4Enabled()
  2390. ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
  2391. : nullptr;
  2392. SocketAddress addr(host, 65535);
  2393. // Connect using a AsyncSocket
  2394. EventBase evb;
  2395. auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
  2396. socket->enableTFO();
  2397. ConnCallback ccb;
  2398. // Set a very small timeout
  2399. socket->connect(&ccb, addr, 1);
  2400. EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
  2401. ReadCallback rcb;
  2402. socket->setReadCB(&rcb);
  2403. EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
  2404. .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
  2405. sockaddr_storage addr2;
  2406. auto len = addr.getAddress(&addr2);
  2407. return connect(fd, (const struct sockaddr*)&addr2, len);
  2408. }));
  2409. WriteCallback write;
  2410. socket->writeChain(&write, IOBuf::copyBuffer("hey"));
  2411. evb.loop();
  2412. EXPECT_EQ(STATE_FAILED, write.state);
  2413. }
  2414. TEST(AsyncSocketTest, TestTFOEagain) {
  2415. TestServer server(true);
  2416. // Connect using a AsyncSocket
  2417. EventBase evb;
  2418. auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
  2419. socket->enableTFO();
  2420. ConnCallback ccb;
  2421. socket->connect(&ccb, server.getAddress(), 30);
  2422. EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
  2423. .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
  2424. WriteCallback write;
  2425. socket->writeChain(&write, IOBuf::copyBuffer("hey"));
  2426. evb.loop();
  2427. EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
  2428. EXPECT_EQ(STATE_FAILED, write.state);
  2429. }
  2430. // Sending a large amount of data in the first write which will
  2431. // definitely not fit into MSS.
  2432. TEST(AsyncSocketTest, ConnectTFOWithBigData) {
  2433. // Start listening on a local port
  2434. TestServer server(true);
  2435. // Connect using a AsyncSocket
  2436. EventBase evb;
  2437. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2438. socket->enableTFO();
  2439. ConnCallback cb;
  2440. socket->connect(&cb, server.getAddress(), 30);
  2441. std::array<uint8_t, 128> buf;
  2442. memset(buf.data(), 'a', buf.size());
  2443. constexpr size_t len = 10 * 1024;
  2444. auto sendBuf = IOBuf::create(len);
  2445. sendBuf->append(len);
  2446. std::array<uint8_t, len> readBuf;
  2447. std::thread t([&] {
  2448. auto acceptedSocket = server.accept();
  2449. acceptedSocket->write(buf.data(), buf.size());
  2450. acceptedSocket->flush();
  2451. acceptedSocket->readAll(readBuf.data(), readBuf.size());
  2452. acceptedSocket->close();
  2453. });
  2454. evb.loop();
  2455. ASSERT_EQ(cb.state, STATE_SUCCEEDED);
  2456. EXPECT_LE(0, socket->getConnectTime().count());
  2457. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
  2458. EXPECT_TRUE(socket->getTFOAttempted());
  2459. // Should trigger the connect
  2460. WriteCallback write;
  2461. ReadCallback rcb;
  2462. socket->writeChain(&write, sendBuf->clone());
  2463. socket->setReadCB(&rcb);
  2464. evb.loop();
  2465. t.join();
  2466. EXPECT_EQ(STATE_SUCCEEDED, write.state);
  2467. EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
  2468. EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
  2469. ASSERT_EQ(1, rcb.buffers.size());
  2470. ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
  2471. EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
  2472. EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
  2473. }
  2474. #endif // FOLLY_ALLOW_TFO
  2475. class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
  2476. public:
  2477. MOCK_METHOD1(evbAttached, void(AsyncSocket*));
  2478. MOCK_METHOD1(evbDetached, void(AsyncSocket*));
  2479. };
  2480. TEST(AsyncSocketTest, EvbCallbacks) {
  2481. auto cb = std::make_unique<MockEvbChangeCallback>();
  2482. EventBase evb;
  2483. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2484. InSequence seq;
  2485. EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
  2486. EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
  2487. socket->setEvbChangedCallback(std::move(cb));
  2488. socket->detachEventBase();
  2489. socket->attachEventBase(&evb);
  2490. }
  2491. TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
  2492. // Start listening on a local port
  2493. TestServer server;
  2494. // Connect using a AsyncSocket
  2495. EventBase evb;
  2496. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2497. ConnCallback cb;
  2498. socket->connect(&cb, server.getAddress(), 30);
  2499. evb.loop();
  2500. ASSERT_EQ(cb.state, STATE_SUCCEEDED);
  2501. EXPECT_LE(0, socket->getConnectTime().count());
  2502. EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
  2503. // After the ioHandlers are registered, still should be able to detach/attach
  2504. ReadCallback rcb;
  2505. socket->setReadCB(&rcb);
  2506. auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
  2507. InSequence seq;
  2508. EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
  2509. EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
  2510. socket->setEvbChangedCallback(std::move(cbEvbChg));
  2511. EXPECT_TRUE(socket->isDetachable());
  2512. socket->detachEventBase();
  2513. socket->attachEventBase(&evb);
  2514. socket->close();
  2515. }
  2516. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  2517. /* copied from include/uapi/linux/net_tstamp.h */
  2518. /* SO_TIMESTAMPING gets an integer bit field comprised of these values */
  2519. enum SOF_TIMESTAMPING {
  2520. SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
  2521. SOF_TIMESTAMPING_OPT_ID = (1 << 7),
  2522. SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
  2523. SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
  2524. SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
  2525. };
  2526. class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback {
  2527. public:
  2528. TestErrMessageCallback()
  2529. : exception_(folly::AsyncSocketException::UNKNOWN, "none") {}
  2530. void errMessage(const cmsghdr& cmsg) noexcept override {
  2531. if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
  2532. gotTimestamp_++;
  2533. checkResetCallback();
  2534. } else if (
  2535. (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
  2536. (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
  2537. gotByteSeq_++;
  2538. checkResetCallback();
  2539. }
  2540. }
  2541. void errMessageError(
  2542. const folly::AsyncSocketException& ex) noexcept override {
  2543. exception_ = ex;
  2544. }
  2545. void checkResetCallback() noexcept {
  2546. if (socket_ != nullptr && resetAfter_ != -1 &&
  2547. gotTimestamp_ + gotByteSeq_ == resetAfter_) {
  2548. socket_->setErrMessageCB(nullptr);
  2549. }
  2550. }
  2551. folly::AsyncSocket* socket_{nullptr};
  2552. folly::AsyncSocketException exception_;
  2553. int gotTimestamp_{0};
  2554. int gotByteSeq_{0};
  2555. int resetAfter_{-1};
  2556. };
  2557. TEST(AsyncSocketTest, ErrMessageCallback) {
  2558. TestServer server;
  2559. // connect()
  2560. EventBase evb;
  2561. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2562. ConnCallback ccb;
  2563. socket->connect(&ccb, server.getAddress(), 30);
  2564. LOG(INFO) << "Client socket fd=" << socket->getFd();
  2565. // Let the socket
  2566. evb.loop();
  2567. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2568. // Set read callback to keep the socket subscribed for event
  2569. // notifications. Though we're no planning to read anything from
  2570. // this side of the connection.
  2571. ReadCallback rcb(1);
  2572. socket->setReadCB(&rcb);
  2573. // Set up timestamp callbacks
  2574. TestErrMessageCallback errMsgCB;
  2575. socket->setErrMessageCB(&errMsgCB);
  2576. ASSERT_EQ(
  2577. socket->getErrMessageCallback(),
  2578. static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
  2579. errMsgCB.socket_ = socket.get();
  2580. errMsgCB.resetAfter_ = 3;
  2581. // Enable timestamp notifications
  2582. ASSERT_GT(socket->getFd(), 0);
  2583. int flags = SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY |
  2584. SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_CMSG |
  2585. SOF_TIMESTAMPING_TX_SCHED;
  2586. AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
  2587. EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
  2588. // write()
  2589. std::vector<uint8_t> wbuf(128, 'a');
  2590. WriteCallback wcb;
  2591. // Send two packets to get two EOM notifications
  2592. socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
  2593. socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
  2594. // Accept the connection.
  2595. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  2596. LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
  2597. // Loop
  2598. evb.loopOnce();
  2599. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  2600. // Check that we can read the data that was written to the socket
  2601. std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
  2602. uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
  2603. ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
  2604. ASSERT_EQ(bytesRead, wbuf.size());
  2605. // Close both sockets
  2606. acceptedSocket->close();
  2607. socket->close();
  2608. ASSERT_TRUE(socket->isClosedBySelf());
  2609. ASSERT_FALSE(socket->isClosedByPeer());
  2610. // Check for the timestamp notifications.
  2611. ASSERT_EQ(
  2612. errMsgCB.exception_.getType(), folly::AsyncSocketException::UNKNOWN);
  2613. ASSERT_GT(errMsgCB.gotByteSeq_, 0);
  2614. ASSERT_GT(errMsgCB.gotTimestamp_, 0);
  2615. ASSERT_EQ(
  2616. errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
  2617. }
  2618. #endif // FOLLY_HAVE_MSG_ERRQUEUE
  2619. TEST(AsyncSocket, PreReceivedData) {
  2620. TestServer server;
  2621. EventBase evb;
  2622. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2623. socket->connect(nullptr, server.getAddress(), 30);
  2624. evb.loop();
  2625. socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
  2626. auto acceptedSocket = server.acceptAsync(&evb);
  2627. ReadCallback peekCallback(2);
  2628. ReadCallback readCallback;
  2629. peekCallback.dataAvailableCallback = [&]() {
  2630. peekCallback.verifyData("he", 2);
  2631. acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
  2632. acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
  2633. acceptedSocket->setReadCB(nullptr);
  2634. acceptedSocket->setReadCB(&readCallback);
  2635. };
  2636. readCallback.dataAvailableCallback = [&]() {
  2637. if (readCallback.dataRead() == 5) {
  2638. readCallback.verifyData("hello", 5);
  2639. acceptedSocket->setReadCB(nullptr);
  2640. }
  2641. };
  2642. acceptedSocket->setReadCB(&peekCallback);
  2643. evb.loop();
  2644. }
  2645. TEST(AsyncSocket, PreReceivedDataOnly) {
  2646. TestServer server;
  2647. EventBase evb;
  2648. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2649. socket->connect(nullptr, server.getAddress(), 30);
  2650. evb.loop();
  2651. socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
  2652. auto acceptedSocket = server.acceptAsync(&evb);
  2653. ReadCallback peekCallback;
  2654. ReadCallback readCallback;
  2655. peekCallback.dataAvailableCallback = [&]() {
  2656. peekCallback.verifyData("hello", 5);
  2657. acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
  2658. acceptedSocket->setReadCB(&readCallback);
  2659. };
  2660. readCallback.dataAvailableCallback = [&]() {
  2661. readCallback.verifyData("hello", 5);
  2662. acceptedSocket->setReadCB(nullptr);
  2663. };
  2664. acceptedSocket->setReadCB(&peekCallback);
  2665. evb.loop();
  2666. }
  2667. TEST(AsyncSocket, PreReceivedDataPartial) {
  2668. TestServer server;
  2669. EventBase evb;
  2670. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2671. socket->connect(nullptr, server.getAddress(), 30);
  2672. evb.loop();
  2673. socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
  2674. auto acceptedSocket = server.acceptAsync(&evb);
  2675. ReadCallback peekCallback;
  2676. ReadCallback smallReadCallback(3);
  2677. ReadCallback normalReadCallback;
  2678. peekCallback.dataAvailableCallback = [&]() {
  2679. peekCallback.verifyData("hello", 5);
  2680. acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
  2681. acceptedSocket->setReadCB(&smallReadCallback);
  2682. };
  2683. smallReadCallback.dataAvailableCallback = [&]() {
  2684. smallReadCallback.verifyData("hel", 3);
  2685. acceptedSocket->setReadCB(&normalReadCallback);
  2686. };
  2687. normalReadCallback.dataAvailableCallback = [&]() {
  2688. normalReadCallback.verifyData("lo", 2);
  2689. acceptedSocket->setReadCB(nullptr);
  2690. };
  2691. acceptedSocket->setReadCB(&peekCallback);
  2692. evb.loop();
  2693. }
  2694. TEST(AsyncSocket, PreReceivedDataTakeover) {
  2695. TestServer server;
  2696. EventBase evb;
  2697. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2698. socket->connect(nullptr, server.getAddress(), 30);
  2699. evb.loop();
  2700. socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
  2701. auto acceptedSocket =
  2702. AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
  2703. AsyncSocket::UniquePtr takeoverSocket;
  2704. ReadCallback peekCallback(3);
  2705. ReadCallback readCallback;
  2706. peekCallback.dataAvailableCallback = [&]() {
  2707. peekCallback.verifyData("hel", 3);
  2708. acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
  2709. acceptedSocket->setReadCB(nullptr);
  2710. takeoverSocket =
  2711. AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
  2712. takeoverSocket->setReadCB(&readCallback);
  2713. };
  2714. readCallback.dataAvailableCallback = [&]() {
  2715. readCallback.verifyData("hello", 5);
  2716. takeoverSocket->setReadCB(nullptr);
  2717. };
  2718. acceptedSocket->setReadCB(&peekCallback);
  2719. evb.loop();
  2720. }
  2721. #ifdef MSG_NOSIGNAL
  2722. TEST(AsyncSocketTest, SendMessageFlags) {
  2723. TestServer server;
  2724. TestSendMsgParamsCallback sendMsgCB(
  2725. MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE, 0, nullptr);
  2726. // connect()
  2727. EventBase evb;
  2728. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
  2729. ConnCallback ccb;
  2730. socket->connect(&ccb, server.getAddress(), 30);
  2731. std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
  2732. evb.loop();
  2733. ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
  2734. // Set SendMsgParamsCallback
  2735. socket->setSendMsgParamCB(&sendMsgCB);
  2736. ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
  2737. // Write the first portion of data. This data is expected to be
  2738. // sent out immediately.
  2739. std::vector<uint8_t> buf(128, 'a');
  2740. WriteCallback wcb;
  2741. sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
  2742. socket->write(&wcb, buf.data(), buf.size());
  2743. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  2744. ASSERT_TRUE(sendMsgCB.queriedFlags_);
  2745. ASSERT_FALSE(sendMsgCB.queriedData_);
  2746. // Using different flags for the second write operation.
  2747. // MSG_MORE flag is expected to delay sending this
  2748. // data to the wire.
  2749. sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
  2750. socket->write(&wcb, buf.data(), buf.size());
  2751. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  2752. ASSERT_TRUE(sendMsgCB.queriedFlags_);
  2753. ASSERT_FALSE(sendMsgCB.queriedData_);
  2754. // Make sure the accepted socket saw only the data from
  2755. // the first write request.
  2756. std::vector<uint8_t> readbuf(2 * buf.size());
  2757. uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
  2758. ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
  2759. ASSERT_EQ(bytesRead, buf.size());
  2760. // Make sure the server got a connection and received the data
  2761. acceptedSocket->close();
  2762. socket->close();
  2763. ASSERT_TRUE(socket->isClosedBySelf());
  2764. ASSERT_FALSE(socket->isClosedByPeer());
  2765. }
  2766. TEST(AsyncSocketTest, SendMessageAncillaryData) {
  2767. int fds[2];
  2768. EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0);
  2769. // "Client" socket
  2770. int cfd = fds[0];
  2771. ASSERT_NE(cfd, -1);
  2772. // "Server" socket
  2773. int sfd = fds[1];
  2774. ASSERT_NE(sfd, -1);
  2775. SCOPE_EXIT {
  2776. close(sfd);
  2777. };
  2778. // Instantiate AsyncSocket object for the connected socket
  2779. EventBase evb;
  2780. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, cfd);
  2781. // Open a temporary file and write a magic string to it
  2782. // We'll transfer the file handle to test the message parameters
  2783. // callback logic.
  2784. TemporaryFile file(
  2785. StringPiece(), fs::path(), TemporaryFile::Scope::UNLINK_IMMEDIATELY);
  2786. int tmpfd = file.fd();
  2787. ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
  2788. std::string magicString("Magic string");
  2789. ASSERT_EQ(
  2790. write(tmpfd, magicString.c_str(), magicString.length()),
  2791. magicString.length());
  2792. // Send message
  2793. union {
  2794. // Space large enough to hold an 'int'
  2795. char control[CMSG_SPACE(sizeof(int))];
  2796. struct cmsghdr cmh;
  2797. } s_u;
  2798. s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
  2799. s_u.cmh.cmsg_level = SOL_SOCKET;
  2800. s_u.cmh.cmsg_type = SCM_RIGHTS;
  2801. memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
  2802. // Set up the callback providing message parameters
  2803. TestSendMsgParamsCallback sendMsgCB(
  2804. MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
  2805. socket->setSendMsgParamCB(&sendMsgCB);
  2806. // We must transmit at least 1 byte of real data in order
  2807. // to send ancillary data
  2808. int s_data = 12345;
  2809. WriteCallback wcb;
  2810. socket->write(&wcb, &s_data, sizeof(s_data));
  2811. ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
  2812. // Receive the message
  2813. union {
  2814. // Space large enough to hold an 'int'
  2815. char control[CMSG_SPACE(sizeof(int))];
  2816. struct cmsghdr cmh;
  2817. } r_u;
  2818. struct msghdr msgh;
  2819. struct iovec iov;
  2820. int r_data = 0;
  2821. msgh.msg_control = r_u.control;
  2822. msgh.msg_controllen = sizeof(r_u.control);
  2823. msgh.msg_name = nullptr;
  2824. msgh.msg_namelen = 0;
  2825. msgh.msg_iov = &iov;
  2826. msgh.msg_iovlen = 1;
  2827. iov.iov_base = &r_data;
  2828. iov.iov_len = sizeof(r_data);
  2829. // Receive data
  2830. ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
  2831. // Validate the received message
  2832. ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
  2833. ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
  2834. ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
  2835. ASSERT_EQ(r_data, s_data);
  2836. int fd = 0;
  2837. memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
  2838. ASSERT_NE(fd, 0);
  2839. SCOPE_EXIT {
  2840. close(fd);
  2841. };
  2842. std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
  2843. // Reposition to the beginning of the file
  2844. ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
  2845. // Read the magic string back, and compare it with the original
  2846. ASSERT_EQ(
  2847. magicString.length(),
  2848. read(fd, transferredMagicString.data(), transferredMagicString.size()));
  2849. ASSERT_TRUE(std::equal(
  2850. magicString.begin(), magicString.end(), transferredMagicString.begin()));
  2851. }
  2852. TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
  2853. // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain
  2854. // Socket (UDS) does not support MSG_ERRQUEUE. So
  2855. // recvmsg(MSG_ERRQUEUE) will read application data from UDS which
  2856. // breaks application message flow. To avoid this problem,
  2857. // AsyncSocket currently disables setErrMessageCB for UDS.
  2858. //
  2859. // This tests two things for UDS
  2860. // 1. setErrMessageCB fails
  2861. // 2. recvmsg(MSG_ERRQUEUE) reads application data
  2862. //
  2863. // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future.
  2864. int fd[2];
  2865. EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
  2866. ASSERT_NE(fd[0], -1);
  2867. ASSERT_NE(fd[1], -1);
  2868. SCOPE_EXIT {
  2869. close(fd[1]);
  2870. };
  2871. EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
  2872. EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
  2873. EventBase evb;
  2874. std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, fd[0]);
  2875. // setErrMessageCB should fail for unix domain socket
  2876. TestErrMessageCallback errMsgCB;
  2877. ASSERT_NE(&errMsgCB, nullptr);
  2878. socket->setErrMessageCB(&errMsgCB);
  2879. ASSERT_EQ(socket->getErrMessageCallback(), nullptr);
  2880. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  2881. // The following verifies that MSG_ERRQUEUE does not work for UDS,
  2882. // and recvmsg reads application data
  2883. union {
  2884. // Space large enough to hold an 'int'
  2885. char control[CMSG_SPACE(sizeof(int))];
  2886. struct cmsghdr cmh;
  2887. } r_u;
  2888. struct msghdr msgh;
  2889. struct iovec iov;
  2890. int recv_data = 0;
  2891. msgh.msg_control = r_u.control;
  2892. msgh.msg_controllen = sizeof(r_u.control);
  2893. msgh.msg_name = nullptr;
  2894. msgh.msg_namelen = 0;
  2895. msgh.msg_iov = &iov;
  2896. msgh.msg_iovlen = 1;
  2897. iov.iov_base = &recv_data;
  2898. iov.iov_len = sizeof(recv_data);
  2899. // there is no data, recvmsg should fail
  2900. EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
  2901. EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
  2902. // provide some application data, error queue should be empty if it exists
  2903. // However, UDS reads application data as error message
  2904. int test_data = 123456;
  2905. WriteCallback wcb;
  2906. socket->write(&wcb, &test_data, sizeof(test_data));
  2907. recv_data = 0;
  2908. ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
  2909. ASSERT_EQ(recv_data, test_data);
  2910. #endif // FOLLY_HAVE_MSG_ERRQUEUE
  2911. }
  2912. TEST(AsyncSocketTest, V6TosReflectTest) {
  2913. EventBase eventBase;
  2914. // Create a server socket
  2915. std::shared_ptr<AsyncServerSocket> serverSocket(
  2916. AsyncServerSocket::newSocket(&eventBase));
  2917. folly::IPAddress ip("::1");
  2918. std::vector<folly::IPAddress> serverIp;
  2919. serverIp.push_back(ip);
  2920. serverSocket->bind(serverIp, 0);
  2921. serverSocket->listen(16);
  2922. folly::SocketAddress serverAddress;
  2923. serverSocket->getAddress(&serverAddress);
  2924. // Enable TOS reflect
  2925. serverSocket->setTosReflect(true);
  2926. // Add a callback to accept one connection then stop the loop
  2927. TestAcceptCallback acceptCallback;
  2928. acceptCallback.setConnectionAcceptedFn(
  2929. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  2930. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  2931. });
  2932. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  2933. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  2934. });
  2935. serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
  2936. serverSocket->startAccepting();
  2937. // Create a client socket, setsockopt() the TOS before connecting
  2938. auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
  2939. ConnCallback* ccb,
  2940. EventBase* evb,
  2941. folly::SocketAddress sAddr) {
  2942. clientSock = AsyncSocket::newSocket(evb);
  2943. AsyncSocket::OptionKey v6Opts = {IPPROTO_IPV6, IPV6_TCLASS};
  2944. AsyncSocket::OptionMap optionMap;
  2945. optionMap.insert({v6Opts, 0x2c});
  2946. SocketAddress bindAddr("0.0.0.0", 0);
  2947. clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
  2948. };
  2949. std::shared_ptr<AsyncSocket> socket(nullptr);
  2950. ConnCallback cb;
  2951. clientThread(socket, &cb, &eventBase, serverAddress);
  2952. eventBase.loop();
  2953. // Verify if the connection is accepted and if the accepted socket has
  2954. // setsockopt on the TOS for the same value that was on the client socket
  2955. int fd = acceptCallback.getEvents()->at(1).fd;
  2956. ASSERT_GE(fd, 0);
  2957. int value;
  2958. socklen_t valueLength = sizeof(value);
  2959. int rc = getsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &value, &valueLength);
  2960. ASSERT_EQ(rc, 0);
  2961. ASSERT_EQ(value, 0x2c);
  2962. }
  2963. TEST(AsyncSocketTest, V4TosReflectTest) {
  2964. EventBase eventBase;
  2965. // Create a server socket
  2966. std::shared_ptr<AsyncServerSocket> serverSocket(
  2967. AsyncServerSocket::newSocket(&eventBase));
  2968. folly::IPAddress ip("127.0.0.1");
  2969. std::vector<folly::IPAddress> serverIp;
  2970. serverIp.push_back(ip);
  2971. serverSocket->bind(serverIp, 0);
  2972. serverSocket->listen(16);
  2973. folly::SocketAddress serverAddress;
  2974. serverSocket->getAddress(&serverAddress);
  2975. // Enable TOS reflect
  2976. serverSocket->setTosReflect(true);
  2977. // Add a callback to accept one connection then stop the loop
  2978. TestAcceptCallback acceptCallback;
  2979. acceptCallback.setConnectionAcceptedFn(
  2980. [&](int /* fd */, const folly::SocketAddress& /* addr */) {
  2981. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  2982. });
  2983. acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
  2984. serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
  2985. });
  2986. serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
  2987. serverSocket->startAccepting();
  2988. // Create a client socket, setsockopt() the TOS before connecting
  2989. auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
  2990. ConnCallback* ccb,
  2991. EventBase* evb,
  2992. folly::SocketAddress sAddr) {
  2993. clientSock = AsyncSocket::newSocket(evb);
  2994. AsyncSocket::OptionKey v4Opts = {IPPROTO_IP, IP_TOS};
  2995. AsyncSocket::OptionMap optionMap;
  2996. optionMap.insert({v4Opts, 0x2c});
  2997. SocketAddress bindAddr("0.0.0.0", 0);
  2998. clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
  2999. };
  3000. std::shared_ptr<AsyncSocket> socket(nullptr);
  3001. ConnCallback cb;
  3002. clientThread(socket, &cb, &eventBase, serverAddress);
  3003. eventBase.loop();
  3004. // Verify if the connection is accepted and if the accepted socket has
  3005. // setsockopt on the TOS for the same value that was on the client socket
  3006. int fd = acceptCallback.getEvents()->at(1).fd;
  3007. ASSERT_GE(fd, 0);
  3008. int value;
  3009. socklen_t valueLength = sizeof(value);
  3010. int rc = getsockopt(fd, IPPROTO_IP, IP_TOS, &value, &valueLength);
  3011. ASSERT_EQ(rc, 0);
  3012. ASSERT_EQ(value, 0x2c);
  3013. }
  3014. #endif