AsyncUDPSocket.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/async/AsyncUDPSocket.h>
  17. #include <folly/Likely.h>
  18. #include <folly/io/async/EventBase.h>
  19. #include <folly/portability/Fcntl.h>
  20. #include <folly/portability/Sockets.h>
  21. #include <folly/portability/Unistd.h>
  22. #include <errno.h>
  23. // Due to the way kernel headers are included, this may or may not be defined.
  24. // Number pulled from 3.10 kernel headers.
  25. #ifndef SO_REUSEPORT
  26. #define SO_REUSEPORT 15
  27. #endif
  28. namespace fsp = folly::portability::sockets;
  29. namespace folly {
  30. AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
  31. : EventHandler(CHECK_NOTNULL(evb)),
  32. readCallback_(nullptr),
  33. eventBase_(evb),
  34. fd_(-1) {
  35. evb->dcheckIsInEventBaseThread();
  36. }
  37. AsyncUDPSocket::~AsyncUDPSocket() {
  38. if (fd_ != -1) {
  39. close();
  40. }
  41. }
  42. void AsyncUDPSocket::bind(const folly::SocketAddress& address) {
  43. int socket = fsp::socket(address.getFamily(), SOCK_DGRAM, IPPROTO_UDP);
  44. if (socket == -1) {
  45. throw AsyncSocketException(
  46. AsyncSocketException::NOT_OPEN,
  47. "error creating async udp socket",
  48. errno);
  49. }
  50. auto g = folly::makeGuard([&] { ::close(socket); });
  51. // put the socket in non-blocking mode
  52. int ret = fcntl(socket, F_SETFL, O_NONBLOCK);
  53. if (ret != 0) {
  54. throw AsyncSocketException(
  55. AsyncSocketException::NOT_OPEN,
  56. "failed to put socket in non-blocking mode",
  57. errno);
  58. }
  59. if (reuseAddr_) {
  60. // put the socket in reuse mode
  61. int value = 1;
  62. if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) !=
  63. 0) {
  64. throw AsyncSocketException(
  65. AsyncSocketException::NOT_OPEN,
  66. "failed to put socket in reuse mode",
  67. errno);
  68. }
  69. }
  70. if (reusePort_) {
  71. // put the socket in port reuse mode
  72. int value = 1;
  73. if (setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value)) !=
  74. 0) {
  75. throw AsyncSocketException(
  76. AsyncSocketException::NOT_OPEN,
  77. "failed to put socket in reuse_port mode",
  78. errno);
  79. }
  80. }
  81. if (busyPollUs_ > 0) {
  82. #ifdef SO_BUSY_POLL
  83. // Set busy_poll time in microseconds on the socket.
  84. // It sets how long socket will be in busy_poll mode when no event occurs.
  85. int value = busyPollUs_;
  86. if (setsockopt(socket, SOL_SOCKET, SO_BUSY_POLL, &value, sizeof(value)) !=
  87. 0) {
  88. throw AsyncSocketException(
  89. AsyncSocketException::NOT_OPEN,
  90. "failed to set SO_BUSY_POLL on the socket",
  91. errno);
  92. }
  93. #else /* SO_BUSY_POLL is not supported*/
  94. throw AsyncSocketException(
  95. AsyncSocketException::NOT_OPEN, "SO_BUSY_POLL is not supported", errno);
  96. #endif
  97. }
  98. if (rcvBuf_ > 0) {
  99. // Set the size of the buffer for the received messages in rx_queues.
  100. int value = rcvBuf_;
  101. if (setsockopt(socket, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) != 0) {
  102. throw AsyncSocketException(
  103. AsyncSocketException::NOT_OPEN,
  104. "failed to set SO_RCVBUF on the socket",
  105. errno);
  106. }
  107. }
  108. if (sndBuf_ > 0) {
  109. // Set the size of the buffer for the sent messages in tx_queues.
  110. int value = rcvBuf_;
  111. if (setsockopt(socket, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) != 0) {
  112. throw AsyncSocketException(
  113. AsyncSocketException::NOT_OPEN,
  114. "failed to set SO_SNDBUF on the socket",
  115. errno);
  116. }
  117. }
  118. // If we're using IPv6, make sure we don't accept V4-mapped connections
  119. if (address.getFamily() == AF_INET6) {
  120. int flag = 1;
  121. if (setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag))) {
  122. throw AsyncSocketException(
  123. AsyncSocketException::NOT_OPEN, "Failed to set IPV6_V6ONLY", errno);
  124. }
  125. }
  126. // bind to the address
  127. sockaddr_storage addrStorage;
  128. address.getAddress(&addrStorage);
  129. sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
  130. if (fsp::bind(socket, saddr, address.getActualSize()) != 0) {
  131. throw AsyncSocketException(
  132. AsyncSocketException::NOT_OPEN,
  133. "failed to bind the async udp socket for:" + address.describe(),
  134. errno);
  135. }
  136. // success
  137. g.dismiss();
  138. fd_ = socket;
  139. ownership_ = FDOwnership::OWNS;
  140. // attach to EventHandler
  141. EventHandler::changeHandlerFD(fd_);
  142. if (address.getPort() != 0) {
  143. localAddress_ = address;
  144. } else {
  145. localAddress_.setFromLocalAddress(fd_);
  146. }
  147. }
  148. void AsyncUDPSocket::dontFragment(bool df) {
  149. (void)df; // to avoid potential unused variable warning
  150. #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DO) && \
  151. defined(IP_PMTUDISC_WANT)
  152. if (address().getFamily() == AF_INET) {
  153. int v4 = df ? IP_PMTUDISC_DO : IP_PMTUDISC_WANT;
  154. if (fsp::setsockopt(fd_, IPPROTO_IP, IP_MTU_DISCOVER, &v4, sizeof(v4))) {
  155. throw AsyncSocketException(
  156. AsyncSocketException::NOT_OPEN,
  157. "Failed to set DF with IP_MTU_DISCOVER",
  158. errno);
  159. }
  160. }
  161. #endif
  162. #if defined(IPV6_MTU_DISCOVER) && defined(IPV6_PMTUDISC_DO) && \
  163. defined(IPV6_PMTUDISC_WANT)
  164. if (address().getFamily() == AF_INET6) {
  165. int v6 = df ? IPV6_PMTUDISC_DO : IPV6_PMTUDISC_WANT;
  166. if (fsp::setsockopt(
  167. fd_, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &v6, sizeof(v6))) {
  168. throw AsyncSocketException(
  169. AsyncSocketException::NOT_OPEN,
  170. "Failed to set DF with IPV6_MTU_DISCOVER",
  171. errno);
  172. }
  173. }
  174. #endif
  175. }
  176. void AsyncUDPSocket::setErrMessageCallback(
  177. ErrMessageCallback* errMessageCallback) {
  178. errMessageCallback_ = errMessageCallback;
  179. int err = (errMessageCallback_ != nullptr);
  180. #if defined(IP_RECVERR)
  181. if (address().getFamily() == AF_INET &&
  182. fsp::setsockopt(fd_, IPPROTO_IP, IP_RECVERR, &err, sizeof(err))) {
  183. throw AsyncSocketException(
  184. AsyncSocketException::NOT_OPEN, "Failed to set IP_RECVERR", errno);
  185. }
  186. #endif
  187. #if defined(IPV6_RECVERR)
  188. if (address().getFamily() == AF_INET6 &&
  189. fsp::setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVERR, &err, sizeof(err))) {
  190. throw AsyncSocketException(
  191. AsyncSocketException::NOT_OPEN, "Failed to set IPV6_RECVERR", errno);
  192. }
  193. #endif
  194. (void)err;
  195. }
  196. void AsyncUDPSocket::setFD(int fd, FDOwnership ownership) {
  197. CHECK_EQ(-1, fd_) << "Already bound to another FD";
  198. fd_ = fd;
  199. ownership_ = ownership;
  200. EventHandler::changeHandlerFD(fd_);
  201. localAddress_.setFromLocalAddress(fd_);
  202. }
  203. ssize_t AsyncUDPSocket::writeGSO(
  204. const folly::SocketAddress& address,
  205. const std::unique_ptr<folly::IOBuf>& buf,
  206. int gso) {
  207. // UDP's typical MTU size is 1500, so high number of buffers
  208. // really do not make sense. Optimize for buffer chains with
  209. // buffers less than 16, which is the highest I can think of
  210. // for a real use case.
  211. iovec vec[16];
  212. size_t iovec_len = buf->fillIov(vec, sizeof(vec) / sizeof(vec[0]));
  213. if (UNLIKELY(iovec_len == 0)) {
  214. buf->coalesce();
  215. vec[0].iov_base = const_cast<uint8_t*>(buf->data());
  216. vec[0].iov_len = buf->length();
  217. iovec_len = 1;
  218. }
  219. return writev(address, vec, iovec_len, gso);
  220. }
  221. ssize_t AsyncUDPSocket::write(
  222. const folly::SocketAddress& address,
  223. const std::unique_ptr<folly::IOBuf>& buf) {
  224. return writeGSO(address, buf, 0);
  225. }
  226. ssize_t AsyncUDPSocket::writev(
  227. const folly::SocketAddress& address,
  228. const struct iovec* vec,
  229. size_t iovec_len,
  230. int gso) {
  231. CHECK_NE(-1, fd_) << "Socket not yet bound";
  232. sockaddr_storage addrStorage;
  233. address.getAddress(&addrStorage);
  234. struct msghdr msg;
  235. msg.msg_name = reinterpret_cast<void*>(&addrStorage);
  236. msg.msg_namelen = address.getActualSize();
  237. msg.msg_iov = const_cast<struct iovec*>(vec);
  238. msg.msg_iovlen = iovec_len;
  239. msg.msg_control = nullptr;
  240. msg.msg_controllen = 0;
  241. msg.msg_flags = 0;
  242. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  243. if (gso > 0) {
  244. char control[CMSG_SPACE(sizeof(uint16_t))];
  245. msg.msg_control = control;
  246. msg.msg_controllen = sizeof(control);
  247. struct cmsghdr* cm = CMSG_FIRSTHDR(&msg);
  248. cm->cmsg_level = SOL_UDP;
  249. cm->cmsg_type = UDP_SEGMENT;
  250. cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
  251. uint16_t gso_len = static_cast<uint16_t>(gso);
  252. memcpy(CMSG_DATA(cm), &gso_len, sizeof(gso_len));
  253. return sendmsg(fd_, &msg, 0);
  254. }
  255. #else
  256. CHECK_LT(gso, 1) << "GSO not supported";
  257. #endif
  258. return sendmsg(fd_, &msg, 0);
  259. }
  260. ssize_t AsyncUDPSocket::writev(
  261. const folly::SocketAddress& address,
  262. const struct iovec* vec,
  263. size_t iovec_len) {
  264. return writev(address, vec, iovec_len, 0);
  265. }
  266. void AsyncUDPSocket::resumeRead(ReadCallback* cob) {
  267. CHECK(!readCallback_) << "Another read callback already installed";
  268. CHECK_NE(-1, fd_) << "UDP server socket not yet bind to an address";
  269. readCallback_ = CHECK_NOTNULL(cob);
  270. if (!updateRegistration()) {
  271. AsyncSocketException ex(
  272. AsyncSocketException::NOT_OPEN, "failed to register for accept events");
  273. readCallback_ = nullptr;
  274. cob->onReadError(ex);
  275. return;
  276. }
  277. }
  278. void AsyncUDPSocket::pauseRead() {
  279. // It is ok to pause an already paused socket
  280. readCallback_ = nullptr;
  281. updateRegistration();
  282. }
  283. void AsyncUDPSocket::close() {
  284. eventBase_->dcheckIsInEventBaseThread();
  285. if (readCallback_) {
  286. auto cob = readCallback_;
  287. readCallback_ = nullptr;
  288. cob->onReadClosed();
  289. }
  290. // Unregister any events we are registered for
  291. unregisterHandler();
  292. if (fd_ != -1 && ownership_ == FDOwnership::OWNS) {
  293. ::close(fd_);
  294. }
  295. fd_ = -1;
  296. }
  297. void AsyncUDPSocket::handlerReady(uint16_t events) noexcept {
  298. if (events & EventHandler::READ) {
  299. DCHECK(readCallback_);
  300. handleRead();
  301. }
  302. }
  303. size_t AsyncUDPSocket::handleErrMessages() noexcept {
  304. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  305. if (errMessageCallback_ == nullptr) {
  306. return 0;
  307. }
  308. uint8_t ctrl[1024];
  309. unsigned char data;
  310. struct msghdr msg;
  311. iovec entry;
  312. entry.iov_base = &data;
  313. entry.iov_len = sizeof(data);
  314. msg.msg_iov = &entry;
  315. msg.msg_iovlen = 1;
  316. msg.msg_name = nullptr;
  317. msg.msg_namelen = 0;
  318. msg.msg_control = ctrl;
  319. msg.msg_controllen = sizeof(ctrl);
  320. msg.msg_flags = 0;
  321. int ret;
  322. size_t num = 0;
  323. while (fd_ != -1) {
  324. ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
  325. VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
  326. if (ret < 0) {
  327. if (errno != EAGAIN) {
  328. auto errnoCopy = errno;
  329. LOG(ERROR) << "::recvmsg exited with code " << ret
  330. << ", errno: " << errnoCopy;
  331. AsyncSocketException ex(
  332. AsyncSocketException::INTERNAL_ERROR,
  333. "recvmsg() failed",
  334. errnoCopy);
  335. failErrMessageRead(ex);
  336. }
  337. return num;
  338. }
  339. for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
  340. cmsg != nullptr && cmsg->cmsg_len != 0;
  341. cmsg = CMSG_NXTHDR(&msg, cmsg)) {
  342. ++num;
  343. errMessageCallback_->errMessage(*cmsg);
  344. if (fd_ == -1) {
  345. // once the socket is closed there is no use for more read errors.
  346. return num;
  347. }
  348. }
  349. }
  350. return num;
  351. #else
  352. return 0;
  353. #endif
  354. }
  355. void AsyncUDPSocket::failErrMessageRead(const AsyncSocketException& ex) {
  356. if (errMessageCallback_ != nullptr) {
  357. ErrMessageCallback* callback = errMessageCallback_;
  358. errMessageCallback_ = nullptr;
  359. callback->errMessageError(ex);
  360. }
  361. }
  362. int AsyncUDPSocket::connect(const folly::SocketAddress& address) {
  363. CHECK_NE(-1, fd_) << "Socket not yet bound";
  364. sockaddr_storage addrStorage;
  365. address.getAddress(&addrStorage);
  366. return fsp::connect(
  367. fd_, reinterpret_cast<sockaddr*>(&addrStorage), address.getActualSize());
  368. }
  369. void AsyncUDPSocket::handleRead() noexcept {
  370. void* buf{nullptr};
  371. size_t len{0};
  372. if (handleErrMessages()) {
  373. return;
  374. }
  375. if (fd_ == -1) {
  376. // The socket may have been closed by the error callbacks.
  377. return;
  378. }
  379. readCallback_->getReadBuffer(&buf, &len);
  380. if (buf == nullptr || len == 0) {
  381. AsyncSocketException ex(
  382. AsyncSocketException::BAD_ARGS,
  383. "AsyncUDPSocket::getReadBuffer() returned empty buffer");
  384. auto cob = readCallback_;
  385. readCallback_ = nullptr;
  386. cob->onReadError(ex);
  387. updateRegistration();
  388. return;
  389. }
  390. struct sockaddr_storage addrStorage;
  391. socklen_t addrLen = sizeof(addrStorage);
  392. memset(&addrStorage, 0, size_t(addrLen));
  393. struct sockaddr* rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
  394. rawAddr->sa_family = localAddress_.getFamily();
  395. ssize_t bytesRead = recvfrom(fd_, buf, len, MSG_TRUNC, rawAddr, &addrLen);
  396. if (bytesRead >= 0) {
  397. clientAddress_.setFromSockaddr(rawAddr, addrLen);
  398. if (bytesRead > 0) {
  399. bool truncated = false;
  400. if ((size_t)bytesRead > len) {
  401. truncated = true;
  402. bytesRead = ssize_t(len);
  403. }
  404. readCallback_->onDataAvailable(
  405. clientAddress_, size_t(bytesRead), truncated);
  406. }
  407. } else {
  408. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  409. // No data could be read without blocking the socket
  410. return;
  411. }
  412. AsyncSocketException ex(
  413. AsyncSocketException::INTERNAL_ERROR, "::recvfrom() failed", errno);
  414. // In case of UDP we can continue reading from the socket
  415. // even if the current request fails. We notify the user
  416. // so that he can do some logging/stats collection if he wants.
  417. auto cob = readCallback_;
  418. readCallback_ = nullptr;
  419. cob->onReadError(ex);
  420. updateRegistration();
  421. }
  422. }
  423. bool AsyncUDPSocket::updateRegistration() noexcept {
  424. uint16_t flags = NONE;
  425. if (readCallback_) {
  426. flags |= READ;
  427. }
  428. return registerHandler(uint16_t(flags | PERSIST));
  429. }
  430. bool AsyncUDPSocket::setGSO(int val) {
  431. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  432. int ret = ::setsockopt(fd_, SOL_UDP, UDP_SEGMENT, &val, sizeof(val));
  433. gso_ = ret ? -1 : val;
  434. return !ret;
  435. #else
  436. (void)val;
  437. return false;
  438. #endif
  439. }
  440. int AsyncUDPSocket::getGSO() {
  441. // check if we can return the cached value
  442. if (FOLLY_UNLIKELY(!gso_.hasValue())) {
  443. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  444. int gso = -1;
  445. socklen_t optlen = sizeof(gso);
  446. if (!::getsockopt(fd_, SOL_UDP, UDP_SEGMENT, &gso, &optlen)) {
  447. gso_ = gso;
  448. } else {
  449. gso_ = -1;
  450. }
  451. #else
  452. gso_ = -1;
  453. #endif
  454. }
  455. return gso_.value();
  456. }
  457. void AsyncUDPSocket::detachEventBase() {
  458. DCHECK(eventBase_ && eventBase_->isInEventBaseThread());
  459. registerHandler(uint16_t(NONE));
  460. eventBase_ = nullptr;
  461. EventHandler::detachEventBase();
  462. }
  463. void AsyncUDPSocket::attachEventBase(folly::EventBase* evb) {
  464. DCHECK(!eventBase_);
  465. DCHECK(evb && evb->isInEventBaseThread());
  466. eventBase_ = evb;
  467. EventHandler::attachEventBase(evb);
  468. updateRegistration();
  469. }
  470. } // namespace folly