AsyncServerSocket.cpp 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef __STDC_FORMAT_MACROS
  17. #define __STDC_FORMAT_MACROS
  18. #endif
  19. #include <folly/io/async/AsyncServerSocket.h>
  20. #include <folly/FileUtil.h>
  21. #include <folly/Portability.h>
  22. #include <folly/SocketAddress.h>
  23. #include <folly/String.h>
  24. #include <folly/detail/SocketFastOpen.h>
  25. #include <folly/io/async/EventBase.h>
  26. #include <folly/io/async/NotificationQueue.h>
  27. #include <folly/portability/Fcntl.h>
  28. #include <folly/portability/Sockets.h>
  29. #include <folly/portability/Unistd.h>
  30. #include <errno.h>
  31. #include <string.h>
  32. #include <sys/types.h>
  33. namespace fsp = folly::portability::sockets;
  34. namespace folly {
  35. #ifndef TCP_SAVE_SYN
  36. #define TCP_SAVE_SYN 27
  37. #endif
  38. #ifndef TCP_SAVED_SYN
  39. #define TCP_SAVED_SYN 28
  40. #endif
  41. static constexpr bool msgErrQueueSupported =
  42. #ifdef FOLLY_HAVE_MSG_ERRQUEUE
  43. true;
  44. #else
  45. false;
  46. #endif // FOLLY_HAVE_MSG_ERRQUEUE
  47. const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
  48. const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
  49. const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
  50. int setCloseOnExec(int fd, int value) {
  51. // Read the current flags
  52. int old_flags = fcntl(fd, F_GETFD, 0);
  53. // If reading the flags failed, return error indication now
  54. if (old_flags < 0) {
  55. return -1;
  56. }
  57. // Set just the flag we want to set
  58. int new_flags;
  59. if (value != 0) {
  60. new_flags = old_flags | FD_CLOEXEC;
  61. } else {
  62. new_flags = old_flags & ~FD_CLOEXEC;
  63. }
  64. // Store modified flag word in the descriptor
  65. return fcntl(fd, F_SETFD, new_flags);
  66. }
  67. void AsyncServerSocket::RemoteAcceptor::start(
  68. EventBase* eventBase,
  69. uint32_t maxAtOnce,
  70. uint32_t maxInQueue) {
  71. setMaxReadAtOnce(maxAtOnce);
  72. queue_.setMaxQueueSize(maxInQueue);
  73. if (!eventBase->runInEventBaseThread([=]() {
  74. callback_->acceptStarted();
  75. this->startConsuming(eventBase, &queue_);
  76. })) {
  77. throw std::invalid_argument(
  78. "unable to start waiting on accept "
  79. "notification queue in the specified "
  80. "EventBase thread");
  81. }
  82. }
  83. void AsyncServerSocket::RemoteAcceptor::stop(
  84. EventBase* eventBase,
  85. AcceptCallback* callback) {
  86. if (!eventBase->runInEventBaseThread([=]() {
  87. callback->acceptStopped();
  88. delete this;
  89. })) {
  90. throw std::invalid_argument(
  91. "unable to start waiting on accept "
  92. "notification queue in the specified "
  93. "EventBase thread");
  94. }
  95. }
  96. void AsyncServerSocket::RemoteAcceptor::messageAvailable(
  97. QueueMessage&& msg) noexcept {
  98. switch (msg.type) {
  99. case MessageType::MSG_NEW_CONN: {
  100. if (connectionEventCallback_) {
  101. connectionEventCallback_->onConnectionDequeuedByAcceptorCallback(
  102. msg.fd, msg.address);
  103. }
  104. callback_->connectionAccepted(msg.fd, msg.address);
  105. break;
  106. }
  107. case MessageType::MSG_ERROR: {
  108. std::runtime_error ex(msg.msg);
  109. callback_->acceptError(ex);
  110. break;
  111. }
  112. default: {
  113. LOG(ERROR) << "invalid accept notification message type "
  114. << int(msg.type);
  115. std::runtime_error ex(
  116. "received invalid accept notification message type");
  117. callback_->acceptError(ex);
  118. }
  119. }
  120. }
  121. /*
  122. * AsyncServerSocket::BackoffTimeout
  123. */
  124. class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
  125. public:
  126. // Disallow copy, move, and default constructors.
  127. BackoffTimeout(BackoffTimeout&&) = delete;
  128. explicit BackoffTimeout(AsyncServerSocket* socket)
  129. : AsyncTimeout(socket->getEventBase()), socket_(socket) {}
  130. void timeoutExpired() noexcept override {
  131. socket_->backoffTimeoutExpired();
  132. }
  133. private:
  134. AsyncServerSocket* socket_;
  135. };
  136. /*
  137. * AsyncServerSocket methods
  138. */
  139. AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
  140. : eventBase_(eventBase),
  141. accepting_(false),
  142. maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
  143. maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
  144. acceptRateAdjustSpeed_(0),
  145. acceptRate_(1),
  146. lastAccepTimestamp_(std::chrono::steady_clock::now()),
  147. numDroppedConnections_(0),
  148. callbackIndex_(0),
  149. backoffTimeout_(nullptr),
  150. callbacks_(),
  151. keepAliveEnabled_(true),
  152. closeOnExec_(true) {
  153. disableTransparentTls();
  154. }
  155. void AsyncServerSocket::setShutdownSocketSet(
  156. const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
  157. const auto newSS = wNewSS.lock();
  158. const auto shutdownSocketSet = wShutdownSocketSet_.lock();
  159. if (shutdownSocketSet == newSS) {
  160. return;
  161. }
  162. if (shutdownSocketSet) {
  163. for (auto& h : sockets_) {
  164. shutdownSocketSet->remove(h.socket_);
  165. }
  166. }
  167. if (newSS) {
  168. for (auto& h : sockets_) {
  169. newSS->add(h.socket_);
  170. }
  171. }
  172. wShutdownSocketSet_ = wNewSS;
  173. }
  174. AsyncServerSocket::~AsyncServerSocket() {
  175. assert(callbacks_.empty());
  176. }
  177. int AsyncServerSocket::stopAccepting(int shutdownFlags) {
  178. int result = 0;
  179. for (auto& handler : sockets_) {
  180. VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_;
  181. }
  182. if (eventBase_) {
  183. eventBase_->dcheckIsInEventBaseThread();
  184. }
  185. // When destroy is called, unregister and close the socket immediately.
  186. accepting_ = false;
  187. // Close the sockets in reverse order as they were opened to avoid
  188. // the condition where another process concurrently tries to open
  189. // the same port, succeed to bind the first socket but fails on the
  190. // second because it hasn't been closed yet.
  191. for (; !sockets_.empty(); sockets_.pop_back()) {
  192. auto& handler = sockets_.back();
  193. handler.unregisterHandler();
  194. if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
  195. shutdownSocketSet->close(handler.socket_);
  196. } else if (shutdownFlags >= 0) {
  197. result = shutdownNoInt(handler.socket_, shutdownFlags);
  198. pendingCloseSockets_.push_back(handler.socket_);
  199. } else {
  200. closeNoInt(handler.socket_);
  201. }
  202. }
  203. // Destroy the backoff timout. This will cancel it if it is running.
  204. delete backoffTimeout_;
  205. backoffTimeout_ = nullptr;
  206. // Close all of the callback queues to notify them that they are being
  207. // destroyed. No one should access the AsyncServerSocket any more once
  208. // destroy() is called. However, clear out callbacks_ before invoking the
  209. // accept callbacks just in case. This will potentially help us detect the
  210. // bug if one of the callbacks calls addAcceptCallback() or
  211. // removeAcceptCallback().
  212. std::vector<CallbackInfo> callbacksCopy;
  213. callbacks_.swap(callbacksCopy);
  214. for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
  215. it != callbacksCopy.end();
  216. ++it) {
  217. // consumer may not be set if we are running in primary event base
  218. if (it->consumer) {
  219. DCHECK(it->eventBase);
  220. it->consumer->stop(it->eventBase, it->callback);
  221. } else {
  222. DCHECK(it->callback);
  223. it->callback->acceptStopped();
  224. }
  225. }
  226. return result;
  227. }
  228. void AsyncServerSocket::destroy() {
  229. stopAccepting();
  230. for (auto s : pendingCloseSockets_) {
  231. closeNoInt(s);
  232. }
  233. // Then call DelayedDestruction::destroy() to take care of
  234. // whether or not we need immediate or delayed destruction
  235. DelayedDestruction::destroy();
  236. }
  237. void AsyncServerSocket::attachEventBase(EventBase* eventBase) {
  238. assert(eventBase_ == nullptr);
  239. eventBase->dcheckIsInEventBaseThread();
  240. eventBase_ = eventBase;
  241. for (auto& handler : sockets_) {
  242. handler.attachEventBase(eventBase);
  243. }
  244. }
  245. void AsyncServerSocket::detachEventBase() {
  246. assert(eventBase_ != nullptr);
  247. eventBase_->dcheckIsInEventBaseThread();
  248. assert(!accepting_);
  249. eventBase_ = nullptr;
  250. for (auto& handler : sockets_) {
  251. handler.detachEventBase();
  252. }
  253. }
  254. void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
  255. if (eventBase_) {
  256. eventBase_->dcheckIsInEventBaseThread();
  257. }
  258. if (sockets_.size() > 0) {
  259. throw std::invalid_argument(
  260. "cannot call useExistingSocket() on a "
  261. "AsyncServerSocket that already has a socket");
  262. }
  263. for (auto fd : fds) {
  264. // Set addressFamily_ from this socket.
  265. // Note that the socket may not have been bound yet, but
  266. // setFromLocalAddress() will still work and get the correct address family.
  267. // We will update addressFamily_ again anyway if bind() is called later.
  268. SocketAddress address;
  269. address.setFromLocalAddress(fd);
  270. #if __linux__
  271. if (noTransparentTls_) {
  272. // Ignore return value, errors are ok
  273. setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
  274. }
  275. #endif
  276. setupSocket(fd, address.getFamily());
  277. sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
  278. sockets_.back().changeHandlerFD(fd);
  279. }
  280. }
  281. void AsyncServerSocket::useExistingSocket(int fd) {
  282. useExistingSockets({fd});
  283. }
  284. void AsyncServerSocket::bindSocket(
  285. int fd,
  286. const SocketAddress& address,
  287. bool isExistingSocket) {
  288. sockaddr_storage addrStorage;
  289. address.getAddress(&addrStorage);
  290. sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
  291. if (fsp::bind(fd, saddr, address.getActualSize()) != 0) {
  292. if (!isExistingSocket) {
  293. closeNoInt(fd);
  294. }
  295. folly::throwSystemError(
  296. errno, "failed to bind to async server socket: " + address.describe());
  297. }
  298. #if __linux__
  299. if (noTransparentTls_) {
  300. // Ignore return value, errors are ok
  301. setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
  302. }
  303. #endif
  304. // If we just created this socket, update the EventHandler and set socket_
  305. if (!isExistingSocket) {
  306. sockets_.emplace_back(eventBase_, fd, this, address.getFamily());
  307. }
  308. }
  309. bool AsyncServerSocket::setZeroCopy(bool enable) {
  310. if (msgErrQueueSupported) {
  311. int fd = getSocket();
  312. int val = enable ? 1 : 0;
  313. int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
  314. return (0 == ret);
  315. }
  316. return false;
  317. }
  318. void AsyncServerSocket::bind(const SocketAddress& address) {
  319. if (eventBase_) {
  320. eventBase_->dcheckIsInEventBaseThread();
  321. }
  322. // useExistingSocket() may have been called to initialize socket_ already.
  323. // However, in the normal case we need to create a new socket now.
  324. // Don't set socket_ yet, so that socket_ will remain uninitialized if an
  325. // error occurs.
  326. int fd;
  327. if (sockets_.size() == 0) {
  328. fd = createSocket(address.getFamily());
  329. } else if (sockets_.size() == 1) {
  330. if (address.getFamily() != sockets_[0].addressFamily_) {
  331. throw std::invalid_argument(
  332. "Attempted to bind address to socket with "
  333. "different address family");
  334. }
  335. fd = sockets_[0].socket_;
  336. } else {
  337. throw std::invalid_argument("Attempted to bind to multiple fds");
  338. }
  339. bindSocket(fd, address, !sockets_.empty());
  340. }
  341. void AsyncServerSocket::bind(
  342. const std::vector<IPAddress>& ipAddresses,
  343. uint16_t port) {
  344. if (ipAddresses.empty()) {
  345. throw std::invalid_argument("No ip addresses were provided");
  346. }
  347. if (!sockets_.empty()) {
  348. throw std::invalid_argument(
  349. "Cannot call bind on a AsyncServerSocket "
  350. "that already has a socket.");
  351. }
  352. for (const IPAddress& ipAddress : ipAddresses) {
  353. SocketAddress address(ipAddress.toFullyQualified(), port);
  354. int fd = createSocket(address.getFamily());
  355. bindSocket(fd, address, false);
  356. }
  357. if (sockets_.size() == 0) {
  358. throw std::runtime_error(
  359. "did not bind any async server socket for port and addresses");
  360. }
  361. }
  362. void AsyncServerSocket::bind(uint16_t port) {
  363. struct addrinfo hints, *res0;
  364. char sport[sizeof("65536")];
  365. memset(&hints, 0, sizeof(hints));
  366. hints.ai_family = AF_UNSPEC;
  367. hints.ai_socktype = SOCK_STREAM;
  368. hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
  369. snprintf(sport, sizeof(sport), "%u", port);
  370. // On Windows the value we need to pass to bind to all available
  371. // addresses is an empty string. Everywhere else, it's nullptr.
  372. constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr;
  373. if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) {
  374. throw std::invalid_argument(
  375. "Attempted to bind address to socket with "
  376. "bad getaddrinfo");
  377. }
  378. SCOPE_EXIT {
  379. freeaddrinfo(res0);
  380. };
  381. auto setupAddress = [&](struct addrinfo* res) {
  382. int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  383. // IPv6/IPv4 may not be supported by the kernel
  384. if (s < 0 && errno == EAFNOSUPPORT) {
  385. return;
  386. }
  387. CHECK_GE(s, 0);
  388. try {
  389. setupSocket(s, res->ai_family);
  390. } catch (...) {
  391. closeNoInt(s);
  392. throw;
  393. }
  394. if (res->ai_family == AF_INET6) {
  395. int v6only = 1;
  396. CHECK(
  397. 0 ==
  398. setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)));
  399. }
  400. // Bind to the socket
  401. if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) {
  402. folly::throwSystemError(
  403. errno,
  404. "failed to bind to async server socket for port ",
  405. SocketAddress::getPortFrom(res->ai_addr),
  406. " family ",
  407. SocketAddress::getFamilyNameFrom(res->ai_addr, "<unknown>"));
  408. }
  409. #if __linux__
  410. if (noTransparentTls_) {
  411. // Ignore return value, errors are ok
  412. setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
  413. }
  414. #endif
  415. SocketAddress address;
  416. address.setFromLocalAddress(s);
  417. sockets_.emplace_back(eventBase_, s, this, address.getFamily());
  418. };
  419. const int kNumTries = 25;
  420. for (int tries = 1; true; tries++) {
  421. // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo
  422. // should return IPv6 first and then IPv4 addresses, but glibc's
  423. // getaddrinfo(nullptr) with AI_PASSIVE returns:
  424. // - 0.0.0.0 (IPv4-only)
  425. // - :: (IPv6+IPv4) in this order
  426. // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981
  427. for (struct addrinfo* res = res0; res; res = res->ai_next) {
  428. if (res->ai_family == AF_INET6) {
  429. setupAddress(res);
  430. }
  431. }
  432. // If port == 0, then we should try to bind to the same port on ipv4 and
  433. // ipv6. So if we did bind to ipv6, figure out that port and use it.
  434. if (sockets_.size() == 1 && port == 0) {
  435. SocketAddress address;
  436. address.setFromLocalAddress(sockets_.back().socket_);
  437. snprintf(sport, sizeof(sport), "%u", address.getPort());
  438. freeaddrinfo(res0);
  439. CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
  440. }
  441. try {
  442. for (struct addrinfo* res = res0; res; res = res->ai_next) {
  443. if (res->ai_family != AF_INET6) {
  444. setupAddress(res);
  445. }
  446. }
  447. } catch (const std::system_error&) {
  448. // If we can't bind to the same port on ipv4 as ipv6 when using
  449. // port=0 then we will retry again before giving up after
  450. // kNumTries attempts. We do this by closing the sockets that
  451. // were opened, then restarting from scratch.
  452. if (port == 0 && !sockets_.empty() && tries != kNumTries) {
  453. for (const auto& socket : sockets_) {
  454. if (socket.socket_ <= 0) {
  455. continue;
  456. } else if (
  457. const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
  458. shutdownSocketSet->close(socket.socket_);
  459. } else {
  460. closeNoInt(socket.socket_);
  461. }
  462. }
  463. sockets_.clear();
  464. snprintf(sport, sizeof(sport), "%u", port);
  465. freeaddrinfo(res0);
  466. CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0));
  467. continue;
  468. }
  469. throw;
  470. }
  471. break;
  472. }
  473. if (sockets_.size() == 0) {
  474. throw std::runtime_error("did not bind any async server socket for port");
  475. }
  476. }
  477. void AsyncServerSocket::listen(int backlog) {
  478. if (eventBase_) {
  479. eventBase_->dcheckIsInEventBaseThread();
  480. }
  481. // Start listening
  482. for (auto& handler : sockets_) {
  483. if (fsp::listen(handler.socket_, backlog) == -1) {
  484. folly::throwSystemError(errno, "failed to listen on async server socket");
  485. }
  486. }
  487. }
  488. void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
  489. CHECK(sockets_.size() >= 1);
  490. VLOG_IF(2, sockets_.size() > 1)
  491. << "Warning: getAddress() called and multiple addresses available ("
  492. << sockets_.size() << "). Returning only the first one.";
  493. addressReturn->setFromLocalAddress(sockets_[0].socket_);
  494. }
  495. std::vector<SocketAddress> AsyncServerSocket::getAddresses() const {
  496. CHECK(sockets_.size() >= 1);
  497. auto tsaVec = std::vector<SocketAddress>(sockets_.size());
  498. auto tsaIter = tsaVec.begin();
  499. for (const auto& socket : sockets_) {
  500. (tsaIter++)->setFromLocalAddress(socket.socket_);
  501. };
  502. return tsaVec;
  503. }
  504. void AsyncServerSocket::addAcceptCallback(
  505. AcceptCallback* callback,
  506. EventBase* eventBase,
  507. uint32_t maxAtOnce) {
  508. if (eventBase_) {
  509. eventBase_->dcheckIsInEventBaseThread();
  510. }
  511. // If this is the first accept callback and we are supposed to be accepting,
  512. // start accepting once the callback is installed.
  513. bool runStartAccepting = accepting_ && callbacks_.empty();
  514. callbacks_.emplace_back(callback, eventBase);
  515. SCOPE_SUCCESS {
  516. // If this is the first accept callback and we are supposed to be accepting,
  517. // start accepting.
  518. if (runStartAccepting) {
  519. startAccepting();
  520. }
  521. };
  522. if (!eventBase) {
  523. // Run in AsyncServerSocket's eventbase; notify that we are
  524. // starting to accept connections
  525. callback->acceptStarted();
  526. return;
  527. }
  528. // Start the remote acceptor.
  529. //
  530. // It would be nice if we could avoid starting the remote acceptor if
  531. // eventBase == eventBase_. However, that would cause issues if
  532. // detachEventBase() and attachEventBase() were ever used to change the
  533. // primary EventBase for the server socket. Therefore we require the caller
  534. // to specify a nullptr EventBase if they want to ensure that the callback is
  535. // always invoked in the primary EventBase, and to be able to invoke that
  536. // callback more efficiently without having to use a notification queue.
  537. RemoteAcceptor* acceptor = nullptr;
  538. try {
  539. acceptor = new RemoteAcceptor(callback, connectionEventCallback_);
  540. acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
  541. } catch (...) {
  542. callbacks_.pop_back();
  543. delete acceptor;
  544. throw;
  545. }
  546. callbacks_.back().consumer = acceptor;
  547. }
  548. void AsyncServerSocket::removeAcceptCallback(
  549. AcceptCallback* callback,
  550. EventBase* eventBase) {
  551. if (eventBase_) {
  552. eventBase_->dcheckIsInEventBaseThread();
  553. }
  554. // Find the matching AcceptCallback.
  555. // We just do a simple linear search; we don't expect removeAcceptCallback()
  556. // to be called frequently, and we expect there to only be a small number of
  557. // callbacks anyway.
  558. std::vector<CallbackInfo>::iterator it = callbacks_.begin();
  559. uint32_t n = 0;
  560. while (true) {
  561. if (it == callbacks_.end()) {
  562. throw std::runtime_error(
  563. "AsyncServerSocket::removeAcceptCallback(): "
  564. "accept callback not found");
  565. }
  566. if (it->callback == callback &&
  567. (it->eventBase == eventBase || eventBase == nullptr)) {
  568. break;
  569. }
  570. ++it;
  571. ++n;
  572. }
  573. // Remove this callback from callbacks_.
  574. //
  575. // Do this before invoking the acceptStopped() callback, in case
  576. // acceptStopped() invokes one of our methods that examines callbacks_.
  577. //
  578. // Save a copy of the CallbackInfo first.
  579. CallbackInfo info(*it);
  580. callbacks_.erase(it);
  581. if (n < callbackIndex_) {
  582. // We removed an element before callbackIndex_. Move callbackIndex_ back
  583. // one step, since things after n have been shifted back by 1.
  584. --callbackIndex_;
  585. } else {
  586. // We removed something at or after callbackIndex_.
  587. // If we removed the last element and callbackIndex_ was pointing at it,
  588. // we need to reset callbackIndex_ to 0.
  589. if (callbackIndex_ >= callbacks_.size()) {
  590. callbackIndex_ = 0;
  591. }
  592. }
  593. if (info.consumer) {
  594. // consumer could be nullptr is we run callbacks in primary event
  595. // base
  596. DCHECK(info.eventBase);
  597. info.consumer->stop(info.eventBase, info.callback);
  598. } else {
  599. // callback invoked in the primary event base, just call directly
  600. DCHECK(info.callback);
  601. callback->acceptStopped();
  602. }
  603. // If we are supposed to be accepting but the last accept callback
  604. // was removed, unregister for events until a callback is added.
  605. if (accepting_ && callbacks_.empty()) {
  606. for (auto& handler : sockets_) {
  607. handler.unregisterHandler();
  608. }
  609. }
  610. }
  611. void AsyncServerSocket::startAccepting() {
  612. if (eventBase_) {
  613. eventBase_->dcheckIsInEventBaseThread();
  614. }
  615. accepting_ = true;
  616. if (callbacks_.empty()) {
  617. // We can't actually begin accepting if no callbacks are defined.
  618. // Wait until a callback is added to start accepting.
  619. return;
  620. }
  621. for (auto& handler : sockets_) {
  622. if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
  623. throw std::runtime_error("failed to register for accept events");
  624. }
  625. }
  626. }
  627. void AsyncServerSocket::pauseAccepting() {
  628. if (eventBase_) {
  629. eventBase_->dcheckIsInEventBaseThread();
  630. }
  631. accepting_ = false;
  632. for (auto& handler : sockets_) {
  633. handler.unregisterHandler();
  634. }
  635. // If we were in the accept backoff state, disable the backoff timeout
  636. if (backoffTimeout_) {
  637. backoffTimeout_->cancelTimeout();
  638. }
  639. }
  640. int AsyncServerSocket::createSocket(int family) {
  641. int fd = fsp::socket(family, SOCK_STREAM, 0);
  642. if (fd == -1) {
  643. folly::throwSystemError(errno, "error creating async server socket");
  644. }
  645. try {
  646. setupSocket(fd, family);
  647. } catch (...) {
  648. closeNoInt(fd);
  649. throw;
  650. }
  651. return fd;
  652. }
  653. /**
  654. * Enable/Disable TOS reflection for the server socket
  655. * If enabled, the 'accepted' connections will reflect the
  656. * TOS derived from the client's connect request
  657. */
  658. void AsyncServerSocket::setTosReflect(bool enable) {
  659. if (!kIsLinux || enable == false) {
  660. tosReflect_ = false;
  661. return;
  662. }
  663. for (auto& handler : sockets_) {
  664. if (handler.socket_ < 0) {
  665. continue;
  666. }
  667. int val = (enable) ? 1 : 0;
  668. int ret = setsockopt(
  669. handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val));
  670. if (ret == 0) {
  671. VLOG(10) << "Enabled SYN save for socket " << handler.socket_;
  672. } else {
  673. folly::throwSystemError(errno, "failed to enable TOS reflect");
  674. }
  675. }
  676. tosReflect_ = true;
  677. }
  678. void AsyncServerSocket::setupSocket(int fd, int family) {
  679. // Put the socket in non-blocking mode
  680. if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
  681. folly::throwSystemError(errno, "failed to put socket in non-blocking mode");
  682. }
  683. // Set reuseaddr to avoid 2MSL delay on server restart
  684. int one = 1;
  685. if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
  686. // This isn't a fatal error; just log an error message and continue
  687. LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
  688. }
  689. // Set reuseport to support multiple accept threads
  690. int zero = 0;
  691. if (reusePortEnabled_ &&
  692. setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) {
  693. LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket "
  694. << errnoStr(errno);
  695. #ifdef WIN32
  696. folly::throwSystemError(errno, "failed to bind to the async server socket");
  697. #else
  698. SocketAddress address;
  699. address.setFromLocalAddress(fd);
  700. folly::throwSystemError(
  701. errno, "failed to bind to async server socket: " + address.describe());
  702. #endif
  703. }
  704. // Set keepalive as desired
  705. if (setsockopt(
  706. fd,
  707. SOL_SOCKET,
  708. SO_KEEPALIVE,
  709. (keepAliveEnabled_) ? &one : &zero,
  710. sizeof(int)) != 0) {
  711. LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: "
  712. << errnoStr(errno);
  713. }
  714. // Setup FD_CLOEXEC flag
  715. if (closeOnExec_ && (-1 == folly::setCloseOnExec(fd, closeOnExec_))) {
  716. LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: "
  717. << errnoStr(errno);
  718. }
  719. // Set TCP nodelay if available, MAC OS X Hack
  720. // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
  721. #ifndef TCP_NOPUSH
  722. if (family != AF_UNIX) {
  723. if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
  724. // This isn't a fatal error; just log an error message and continue
  725. LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: "
  726. << errnoStr(errno);
  727. }
  728. }
  729. #else
  730. (void)family; // to avoid unused parameter warning
  731. #endif
  732. #if FOLLY_ALLOW_TFO
  733. if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) {
  734. // This isn't a fatal error; just log an error message and continue
  735. LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: "
  736. << folly::errnoStr(errno);
  737. }
  738. #endif
  739. if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
  740. shutdownSocketSet->add(fd);
  741. }
  742. }
  743. void AsyncServerSocket::handlerReady(
  744. uint16_t /* events */,
  745. int fd,
  746. sa_family_t addressFamily) noexcept {
  747. assert(!callbacks_.empty());
  748. DestructorGuard dg(this);
  749. // Only accept up to maxAcceptAtOnce_ connections at a time,
  750. // to avoid starving other I/O handlers using this EventBase.
  751. for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
  752. SocketAddress address;
  753. sockaddr_storage addrStorage;
  754. socklen_t addrLen = sizeof(addrStorage);
  755. sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
  756. // In some cases, accept() doesn't seem to update these correctly.
  757. saddr->sa_family = addressFamily;
  758. if (addressFamily == AF_UNIX) {
  759. addrLen = sizeof(struct sockaddr_un);
  760. }
  761. // Accept a new client socket
  762. #ifdef SOCK_NONBLOCK
  763. int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
  764. #else
  765. int clientSocket = accept(fd, saddr, &addrLen);
  766. #endif
  767. address.setFromSockaddr(saddr, addrLen);
  768. if (clientSocket >= 0 && connectionEventCallback_) {
  769. connectionEventCallback_->onConnectionAccepted(clientSocket, address);
  770. }
  771. // Connection accepted, get the SYN packet from the client if
  772. // TOS reflect is enabled
  773. if (kIsLinux && clientSocket >= 0 && tosReflect_) {
  774. std::array<uint32_t, 64> buffer;
  775. socklen_t len = sizeof(buffer);
  776. int ret =
  777. getsockopt(clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len);
  778. if (ret == 0) {
  779. uint32_t tosWord = folly::Endian::big(buffer[0]);
  780. if (addressFamily == AF_INET6) {
  781. tosWord = (tosWord & 0x0FC00000) >> 20;
  782. ret = setsockopt(
  783. clientSocket,
  784. IPPROTO_IPV6,
  785. IPV6_TCLASS,
  786. &tosWord,
  787. sizeof(tosWord));
  788. } else if (addressFamily == AF_INET) {
  789. tosWord = (tosWord & 0x00FC0000) >> 16;
  790. ret = setsockopt(
  791. clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord));
  792. }
  793. if (ret != 0) {
  794. LOG(ERROR) << "Unable to set TOS for accepted socket "
  795. << clientSocket;
  796. }
  797. } else {
  798. LOG(ERROR) << "Unable to get SYN packet for accepted socket "
  799. << clientSocket;
  800. }
  801. }
  802. std::chrono::time_point<std::chrono::steady_clock> nowMs =
  803. std::chrono::steady_clock::now();
  804. auto timeSinceLastAccept = std::max<int64_t>(
  805. 0,
  806. nowMs.time_since_epoch().count() -
  807. lastAccepTimestamp_.time_since_epoch().count());
  808. lastAccepTimestamp_ = nowMs;
  809. if (acceptRate_ < 1) {
  810. acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
  811. if (acceptRate_ >= 1) {
  812. acceptRate_ = 1;
  813. } else if (rand() > acceptRate_ * RAND_MAX) {
  814. ++numDroppedConnections_;
  815. if (clientSocket >= 0) {
  816. closeNoInt(clientSocket);
  817. if (connectionEventCallback_) {
  818. connectionEventCallback_->onConnectionDropped(
  819. clientSocket, address);
  820. }
  821. }
  822. continue;
  823. }
  824. }
  825. if (clientSocket < 0) {
  826. if (errno == EAGAIN) {
  827. // No more sockets to accept right now.
  828. // Check for this code first, since it's the most common.
  829. return;
  830. } else if (errno == EMFILE || errno == ENFILE) {
  831. // We're out of file descriptors. Perhaps we're accepting connections
  832. // too quickly. Pause accepting briefly to back off and give the server
  833. // a chance to recover.
  834. LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
  835. "back-off state";
  836. enterBackoff();
  837. // Dispatch the error message
  838. dispatchError("accept() failed", errno);
  839. } else {
  840. dispatchError("accept() failed", errno);
  841. }
  842. if (connectionEventCallback_) {
  843. connectionEventCallback_->onConnectionAcceptError(errno);
  844. }
  845. return;
  846. }
  847. #ifndef SOCK_NONBLOCK
  848. // Explicitly set the new connection to non-blocking mode
  849. if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
  850. closeNoInt(clientSocket);
  851. dispatchError(
  852. "failed to set accepted socket to non-blocking mode", errno);
  853. if (connectionEventCallback_) {
  854. connectionEventCallback_->onConnectionDropped(clientSocket, address);
  855. }
  856. return;
  857. }
  858. #endif
  859. // Inform the callback about the new connection
  860. dispatchSocket(clientSocket, std::move(address));
  861. // If we aren't accepting any more, break out of the loop
  862. if (!accepting_ || callbacks_.empty()) {
  863. break;
  864. }
  865. }
  866. }
  867. void AsyncServerSocket::dispatchSocket(int socket, SocketAddress&& address) {
  868. uint32_t startingIndex = callbackIndex_;
  869. // Short circuit if the callback is in the primary EventBase thread
  870. CallbackInfo* info = nextCallback();
  871. if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
  872. info->callback->connectionAccepted(socket, address);
  873. return;
  874. }
  875. const SocketAddress addr(address);
  876. // Create a message to send over the notification queue
  877. QueueMessage msg;
  878. msg.type = MessageType::MSG_NEW_CONN;
  879. msg.address = std::move(address);
  880. msg.fd = socket;
  881. // Loop until we find a free queue to write to
  882. while (true) {
  883. if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
  884. if (connectionEventCallback_) {
  885. connectionEventCallback_->onConnectionEnqueuedForAcceptorCallback(
  886. socket, addr);
  887. }
  888. // Success! return.
  889. return;
  890. }
  891. // We couldn't add to queue. Fall through to below
  892. ++numDroppedConnections_;
  893. if (acceptRateAdjustSpeed_ > 0) {
  894. // aggressively decrease accept rate when in trouble
  895. static const double kAcceptRateDecreaseSpeed = 0.1;
  896. acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
  897. }
  898. if (callbackIndex_ == startingIndex) {
  899. // The notification queue was full
  900. // We can't really do anything at this point other than close the socket.
  901. //
  902. // This should only happen if a user's service is behaving extremely
  903. // badly and none of the EventBase threads are looping fast enough to
  904. // process the incoming connections. If the service is overloaded, it
  905. // should use pauseAccepting() to temporarily back off accepting new
  906. // connections, before they reach the point where their threads can't
  907. // even accept new messages.
  908. LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:"
  909. << " all accept callback queues are full";
  910. closeNoInt(socket);
  911. if (connectionEventCallback_) {
  912. connectionEventCallback_->onConnectionDropped(socket, addr);
  913. }
  914. return;
  915. }
  916. info = nextCallback();
  917. }
  918. }
  919. void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) {
  920. uint32_t startingIndex = callbackIndex_;
  921. CallbackInfo* info = nextCallback();
  922. // Create a message to send over the notification queue
  923. QueueMessage msg;
  924. msg.type = MessageType::MSG_ERROR;
  925. msg.err = errnoValue;
  926. msg.msg = std::move(msgstr);
  927. while (true) {
  928. // Short circuit if the callback is in the primary EventBase thread
  929. if (info->eventBase == nullptr || info->eventBase == this->eventBase_) {
  930. std::runtime_error ex(
  931. std::string(msgstr) + folly::to<std::string>(errnoValue));
  932. info->callback->acceptError(ex);
  933. return;
  934. }
  935. if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
  936. return;
  937. }
  938. // Fall through and try another callback
  939. if (callbackIndex_ == startingIndex) {
  940. // The notification queues for all of the callbacks were full.
  941. // We can't really do anything at this point.
  942. LOG_EVERY_N(ERROR, 100)
  943. << "failed to dispatch accept error: all accept"
  944. << " callback queues are full: error msg: " << msg.msg << ": "
  945. << errnoValue;
  946. return;
  947. }
  948. info = nextCallback();
  949. }
  950. }
  951. void AsyncServerSocket::enterBackoff() {
  952. // If this is the first time we have entered the backoff state,
  953. // allocate backoffTimeout_.
  954. if (backoffTimeout_ == nullptr) {
  955. try {
  956. backoffTimeout_ = new BackoffTimeout(this);
  957. } catch (const std::bad_alloc&) {
  958. // Man, we couldn't even allocate the timer to re-enable accepts.
  959. // We must be in pretty bad shape. Don't pause accepting for now,
  960. // since we won't be able to re-enable ourselves later.
  961. LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
  962. << " timer; unable to temporarly pause accepting";
  963. if (connectionEventCallback_) {
  964. connectionEventCallback_->onBackoffError();
  965. }
  966. return;
  967. }
  968. }
  969. // For now, we simply pause accepting for 1 second.
  970. //
  971. // We could add some smarter backoff calculation here in the future. (e.g.,
  972. // start sleeping for longer if we keep hitting the backoff frequently.)
  973. // Typically the user needs to figure out why the server is overloaded and
  974. // fix it in some other way, though. The backoff timer is just a simple
  975. // mechanism to try and give the connection processing code a little bit of
  976. // breathing room to catch up, and to avoid just spinning and failing to
  977. // accept over and over again.
  978. const uint32_t timeoutMS = 1000;
  979. if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
  980. LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
  981. << "unable to temporarly pause accepting";
  982. if (connectionEventCallback_) {
  983. connectionEventCallback_->onBackoffError();
  984. }
  985. return;
  986. }
  987. // The backoff timer is scheduled to re-enable accepts.
  988. // Go ahead and disable accepts for now. We leave accepting_ set to true,
  989. // since that tracks the desired state requested by the user.
  990. for (auto& handler : sockets_) {
  991. handler.unregisterHandler();
  992. }
  993. if (connectionEventCallback_) {
  994. connectionEventCallback_->onBackoffStarted();
  995. }
  996. }
  997. void AsyncServerSocket::backoffTimeoutExpired() {
  998. // accepting_ should still be true.
  999. // If pauseAccepting() was called while in the backoff state it will cancel
  1000. // the backoff timeout.
  1001. assert(accepting_);
  1002. // We can't be detached from the EventBase without being paused
  1003. assert(eventBase_ != nullptr);
  1004. eventBase_->dcheckIsInEventBaseThread();
  1005. // If all of the callbacks were removed, we shouldn't re-enable accepts
  1006. if (callbacks_.empty()) {
  1007. if (connectionEventCallback_) {
  1008. connectionEventCallback_->onBackoffEnded();
  1009. }
  1010. return;
  1011. }
  1012. // Register the handler.
  1013. for (auto& handler : sockets_) {
  1014. if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) {
  1015. // We're hosed. We could just re-schedule backoffTimeout_ to
  1016. // re-try again after a little bit. However, we don't want to
  1017. // loop retrying forever if we can't re-enable accepts. Just
  1018. // abort the entire program in this state; things are really bad
  1019. // and restarting the entire server is probably the best remedy.
  1020. LOG(ERROR)
  1021. << "failed to re-enable AsyncServerSocket accepts after backoff; "
  1022. << "crashing now";
  1023. abort();
  1024. }
  1025. }
  1026. if (connectionEventCallback_) {
  1027. connectionEventCallback_->onBackoffEnded();
  1028. }
  1029. }
  1030. } // namespace folly