AsyncUDPSocketGSOTest.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <numeric>
  17. #include <thread>
  18. #include <folly/Conv.h>
  19. #include <folly/SocketAddress.h>
  20. #include <folly/io/IOBuf.h>
  21. #include <folly/io/async/AsyncTimeout.h>
  22. #include <folly/io/async/AsyncUDPServerSocket.h>
  23. #include <folly/io/async/AsyncUDPSocket.h>
  24. #include <folly/io/async/EventBase.h>
  25. #include <folly/portability/GMock.h>
  26. #include <folly/portability/GTest.h>
  27. using folly::AsyncTimeout;
  28. using folly::AsyncUDPServerSocket;
  29. using folly::AsyncUDPSocket;
  30. using folly::EventBase;
  31. using folly::IOBuf;
  32. using folly::SocketAddress;
  33. using namespace testing;
  34. struct TestData {
  35. TestData(
  36. int gso,
  37. bool useSocketGSO,
  38. int* in,
  39. size_t inLen,
  40. int* expected,
  41. size_t expectedLen)
  42. : gso_(gso), useSocketGSO_(useSocketGSO) {
  43. in_.assign(in, in + inLen);
  44. expected_.assign(expected, expected + expectedLen);
  45. expectedSize_ = std::accumulate(expected_.begin(), expected_.end(), 0);
  46. }
  47. bool checkIn() const {
  48. return (expectedSize_ == std::accumulate(in_.begin(), in_.end(), 0));
  49. }
  50. bool checkOut() const {
  51. return (expectedSize_ == std::accumulate(out_.begin(), out_.end(), 0));
  52. }
  53. bool appendOut(int num) {
  54. out_.push_back(num);
  55. outSize_ += num;
  56. return (outSize_ >= expectedSize_);
  57. }
  58. std::unique_ptr<folly::IOBuf> getInBuf() {
  59. if (!in_.size()) {
  60. return nullptr;
  61. }
  62. std::string str(in_[0], 'A');
  63. std::unique_ptr<folly::IOBuf> ret =
  64. folly::IOBuf::copyBuffer(str.data(), str.size());
  65. for (size_t i = 1; i < in_.size(); i++) {
  66. str = std::string(in_[i], 'A');
  67. ret->prependChain(folly::IOBuf::copyBuffer(str.data(), str.size()));
  68. }
  69. return ret;
  70. }
  71. int gso_{0};
  72. bool useSocketGSO_{false};
  73. std::vector<int> in_;
  74. std::vector<int> expected_; // expected
  75. int expectedSize_;
  76. std::vector<int> out_;
  77. int outSize_{0};
  78. };
  79. class UDPAcceptor : public AsyncUDPServerSocket::Callback {
  80. public:
  81. UDPAcceptor(EventBase* evb) : evb_(evb) {}
  82. void onListenStarted() noexcept override {}
  83. void onListenStopped() noexcept override {}
  84. void onDataAvailable(
  85. std::shared_ptr<folly::AsyncUDPSocket> socket,
  86. const folly::SocketAddress& client,
  87. std::unique_ptr<folly::IOBuf> data,
  88. bool /*unused*/) noexcept override {
  89. // send pong
  90. socket->write(client, data->clone());
  91. }
  92. private:
  93. EventBase* const evb_{nullptr};
  94. };
  95. class UDPServer {
  96. public:
  97. UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
  98. : evb_(evb), addr_(addr), evbs_(n) {}
  99. void start() {
  100. CHECK(evb_->isInEventBaseThread());
  101. socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
  102. try {
  103. socket_->bind(addr_);
  104. VLOG(4) << "Server listening on " << socket_->address().describe();
  105. } catch (const std::exception& ex) {
  106. LOG(FATAL) << ex.what();
  107. }
  108. acceptors_.reserve(evbs_.size());
  109. threads_.reserve(evbs_.size());
  110. // Add numWorkers thread
  111. int i = 0;
  112. for (auto& evb : evbs_) {
  113. acceptors_.emplace_back(&evb);
  114. std::thread t([&]() { evb.loopForever(); });
  115. evb.waitUntilRunning();
  116. socket_->addListener(&evb, &acceptors_[i]);
  117. threads_.emplace_back(std::move(t));
  118. ++i;
  119. }
  120. socket_->listen();
  121. }
  122. folly::SocketAddress address() const {
  123. return socket_->address();
  124. }
  125. void shutdown() {
  126. CHECK(evb_->isInEventBaseThread());
  127. socket_->close();
  128. socket_.reset();
  129. for (auto& evb : evbs_) {
  130. evb.terminateLoopSoon();
  131. }
  132. for (auto& t : threads_) {
  133. t.join();
  134. }
  135. }
  136. void pauseAccepting() {
  137. socket_->pauseAccepting();
  138. }
  139. void resumeAccepting() {
  140. socket_->resumeAccepting();
  141. }
  142. private:
  143. EventBase* const evb_{nullptr};
  144. const folly::SocketAddress addr_;
  145. std::unique_ptr<AsyncUDPServerSocket> socket_;
  146. std::vector<std::thread> threads_;
  147. std::vector<folly::EventBase> evbs_;
  148. std::vector<UDPAcceptor> acceptors_;
  149. };
  150. class UDPClient : private AsyncUDPSocket::ReadCallback, private AsyncTimeout {
  151. public:
  152. explicit UDPClient(EventBase* evb, TestData& testData)
  153. : AsyncTimeout(evb), evb_(evb), testData_(testData) {}
  154. void start(const folly::SocketAddress& server) {
  155. CHECK(evb_->isInEventBaseThread());
  156. server_ = server;
  157. socket_ = std::make_unique<AsyncUDPSocket>(evb_);
  158. try {
  159. socket_->bind(folly::SocketAddress("127.0.0.1", 0));
  160. if (connectAddr_) {
  161. connect();
  162. }
  163. VLOG(2) << "Client bound to " << socket_->address().describe();
  164. } catch (const std::exception& ex) {
  165. LOG(FATAL) << ex.what();
  166. }
  167. // succeed if GSO not available
  168. if (socket_->getGSO() < 0) {
  169. LOG(INFO) << "GSO not supported";
  170. testData_.out_ = testData_.expected_;
  171. shutdown();
  172. return;
  173. }
  174. if (testData_.useSocketGSO_) {
  175. socket_->setGSO(testData_.gso_);
  176. } else {
  177. socket_->setGSO(0);
  178. }
  179. socket_->resumeRead(this);
  180. // Start playing ping pong
  181. sendPing();
  182. }
  183. void connect() {
  184. int ret = socket_->connect(*connectAddr_);
  185. if (ret != 0) {
  186. throw folly::AsyncSocketException(
  187. folly::AsyncSocketException::NOT_OPEN, "ConnectFail", errno);
  188. }
  189. VLOG(2) << "Client connected to address=" << *connectAddr_;
  190. }
  191. void shutdown() {
  192. CHECK(evb_->isInEventBaseThread());
  193. socket_->pauseRead();
  194. socket_->close();
  195. socket_.reset();
  196. evb_->terminateLoopSoon();
  197. }
  198. void sendPing() {
  199. scheduleTimeout(5);
  200. writePing(
  201. testData_.getInBuf(), testData_.useSocketGSO_ ? -1 : testData_.gso_);
  202. }
  203. virtual void writePing(std::unique_ptr<folly::IOBuf> buf, int gso) {
  204. socket_->writeGSO(server_, std::move(buf), gso);
  205. }
  206. void getReadBuffer(void** buf, size_t* len) noexcept override {
  207. *buf = buf_;
  208. *len = sizeof(buf_);
  209. }
  210. void onDataAvailable(
  211. const folly::SocketAddress& /*unused*/,
  212. size_t len,
  213. bool /*unused*/) noexcept override {
  214. VLOG(0) << "Got " << len << " bytes";
  215. if (testData_.appendOut(len)) {
  216. shutdown();
  217. }
  218. }
  219. void onReadError(const folly::AsyncSocketException& ex) noexcept override {
  220. VLOG(4) << ex.what();
  221. // Start listening for next PONG
  222. socket_->resumeRead(this);
  223. }
  224. void onReadClosed() noexcept override {
  225. CHECK(false) << "We unregister reads before closing";
  226. }
  227. void timeoutExpired() noexcept override {
  228. VLOG(4) << "Timeout expired";
  229. shutdown();
  230. }
  231. AsyncUDPSocket& getSocket() {
  232. return *socket_;
  233. }
  234. void setShouldConnect(const folly::SocketAddress& connectAddr) {
  235. connectAddr_ = connectAddr;
  236. }
  237. protected:
  238. folly::Optional<folly::SocketAddress> connectAddr_;
  239. EventBase* const evb_{nullptr};
  240. folly::SocketAddress server_;
  241. std::unique_ptr<AsyncUDPSocket> socket_;
  242. private:
  243. char buf_[2048];
  244. TestData& testData_;
  245. };
  246. class AsyncSocketGSOIntegrationTest : public Test {
  247. public:
  248. void SetUp() override {
  249. server = std::make_unique<UDPServer>(
  250. &sevb, folly::SocketAddress("127.0.0.1", 0), 1);
  251. // Start event loop in a separate thread
  252. serverThread =
  253. std::make_unique<std::thread>([this]() { sevb.loopForever(); });
  254. // Wait for event loop to start
  255. sevb.waitUntilRunning();
  256. }
  257. void startServer() {
  258. // Start the server
  259. sevb.runInEventBaseThreadAndWait([&]() { server->start(); });
  260. LOG(INFO) << "Server listening=" << server->address();
  261. }
  262. void TearDown() override {
  263. // Shutdown server
  264. sevb.runInEventBaseThread([&]() {
  265. server->shutdown();
  266. sevb.terminateLoopSoon();
  267. });
  268. // Wait for server thread to join
  269. serverThread->join();
  270. }
  271. std::unique_ptr<UDPClient> performPingPongTest(
  272. TestData& testData,
  273. folly::Optional<folly::SocketAddress> connectedAddress);
  274. folly::EventBase sevb;
  275. folly::EventBase cevb;
  276. TestData* testData_{nullptr};
  277. std::unique_ptr<std::thread> serverThread;
  278. std::unique_ptr<UDPServer> server;
  279. std::unique_ptr<UDPClient> client;
  280. };
  281. std::unique_ptr<UDPClient> AsyncSocketGSOIntegrationTest::performPingPongTest(
  282. TestData& testData,
  283. folly::Optional<folly::SocketAddress> connectedAddress) {
  284. testData_ = &testData;
  285. client = std::make_unique<UDPClient>(&cevb, testData);
  286. if (connectedAddress) {
  287. client->setShouldConnect(*connectedAddress);
  288. }
  289. // Start event loop in a separate thread
  290. auto clientThread = std::thread([this]() { cevb.loopForever(); });
  291. // Wait for event loop to start
  292. cevb.waitUntilRunning();
  293. // Send ping
  294. cevb.runInEventBaseThread([&]() { client->start(server->address()); });
  295. // Wait for client to finish
  296. clientThread.join();
  297. return std::move(client);
  298. }
  299. TEST_F(AsyncSocketGSOIntegrationTest, PingPongGlobalGSO) {
  300. int gso = 1000;
  301. int in[] = {100, 1200, 3000, 200, 100, 300};
  302. int expected[] = {1000, 1000, 1000, 1000, 900};
  303. TestData testData(
  304. gso,
  305. true /*useSocketGSO*/,
  306. in,
  307. sizeof(in) / sizeof(in[0]),
  308. expected,
  309. sizeof(expected) / sizeof(expected[0]));
  310. ASSERT_TRUE(testData.checkIn());
  311. startServer();
  312. auto pingClient = performPingPongTest(testData, folly::none);
  313. ASSERT_TRUE(testData.checkOut());
  314. }
  315. TEST_F(AsyncSocketGSOIntegrationTest, PingPongRequestGSO) {
  316. int gso = 421;
  317. int in[] = {100, 1200, 3000, 200, 100, 300};
  318. int expected[] = {421, 421, 421, 421, 421, 421, 421, 421, 421, 421, 421, 269};
  319. TestData testData(
  320. gso,
  321. false /*useSocketGSO*/,
  322. in,
  323. sizeof(in) / sizeof(in[0]),
  324. expected,
  325. sizeof(expected) / sizeof(expected[0]));
  326. ASSERT_TRUE(testData.checkIn());
  327. startServer();
  328. auto pingClient = performPingPongTest(testData, folly::none);
  329. ASSERT_TRUE(testData.checkOut());
  330. }
  331. // buffer sizes
  332. constexpr auto kGSO1 = 100;
  333. constexpr auto kGSO2 = 200;
  334. constexpr auto kGSO = kGSO1 + kGSO2;
  335. class GSOBuf {
  336. public:
  337. explicit GSOBuf(size_t size1, size_t size2 = 0) {
  338. std::string str(size1, 'A');
  339. ioBuf_ = folly::IOBuf::copyBuffer(str.data(), str.size());
  340. if (size2) {
  341. str = std::string(size2, 'B');
  342. auto tmp = folly::IOBuf::copyBuffer(str.data(), str.size());
  343. ioBuf_->prependChain(std::move(tmp));
  344. }
  345. }
  346. const std::unique_ptr<IOBuf>& get() const {
  347. return ioBuf_;
  348. }
  349. private:
  350. std::unique_ptr<IOBuf> ioBuf_;
  351. };
  352. class GSOSendTest {
  353. public:
  354. explicit GSOSendTest(
  355. folly::AsyncUDPSocket& socket,
  356. const folly::SocketAddress& address,
  357. int gso,
  358. size_t size1,
  359. size_t size2 = 0) {
  360. GSOBuf buf(size1, size2);
  361. ret_ = socket.writeGSO(address, buf.get(), gso);
  362. }
  363. ssize_t get() const {
  364. return ret_;
  365. }
  366. private:
  367. ssize_t ret_;
  368. };
  369. TEST(AsyncSocketGSOTest, send) {
  370. EventBase evb;
  371. folly::AsyncUDPSocket client(&evb);
  372. client.bind(folly::SocketAddress("127.0.0.1", 0));
  373. if (client.getGSO() < 0) {
  374. LOG(INFO) << "GSO not supported";
  375. // GSO not supported
  376. return;
  377. }
  378. folly::AsyncUDPSocket server(&evb);
  379. server.bind(folly::SocketAddress("127.0.0.1", 0));
  380. // send less than GSO in a single IOBuf
  381. {
  382. GSOSendTest test(client, server.address(), kGSO, kGSO - 1);
  383. CHECK_LT(test.get(), 0);
  384. }
  385. // send less than GSO in multiple IOBufs
  386. {
  387. GSOSendTest test(client, server.address(), kGSO, kGSO1 - 1, kGSO2);
  388. CHECK_LT(test.get(), 0);
  389. }
  390. // send GSO in a single IOBuf
  391. {
  392. GSOSendTest test(client, server.address(), kGSO, kGSO);
  393. CHECK_LT(test.get(), 0);
  394. }
  395. // send GSO in multiple IOBuf
  396. {
  397. GSOSendTest test(client, server.address(), kGSO, kGSO1, kGSO2);
  398. CHECK_LT(test.get(), 0);
  399. }
  400. // send more than GSO in a single IOBuf
  401. {
  402. GSOSendTest test(client, server.address(), kGSO, kGSO + 1);
  403. CHECK_EQ(test.get(), kGSO + 1);
  404. }
  405. // send more than GSO in a multiple IOBufs
  406. {
  407. GSOSendTest test(client, server.address(), kGSO, kGSO1 + 1, kGSO2 + 1);
  408. CHECK_EQ(test.get(), kGSO + 2);
  409. }
  410. }