/* * Copyright 2014-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace fsp = folly::portability::sockets; namespace folly { #ifndef TCP_SAVE_SYN #define TCP_SAVE_SYN 27 #endif #ifndef TCP_SAVED_SYN #define TCP_SAVED_SYN 28 #endif static constexpr bool msgErrQueueSupported = #ifdef FOLLY_HAVE_MSG_ERRQUEUE true; #else false; #endif // FOLLY_HAVE_MSG_ERRQUEUE const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue; int setCloseOnExec(int fd, int value) { // Read the current flags int old_flags = fcntl(fd, F_GETFD, 0); // If reading the flags failed, return error indication now if (old_flags < 0) { return -1; } // Set just the flag we want to set int new_flags; if (value != 0) { new_flags = old_flags | FD_CLOEXEC; } else { new_flags = old_flags & ~FD_CLOEXEC; } // Store modified flag word in the descriptor return fcntl(fd, F_SETFD, new_flags); } void AsyncServerSocket::RemoteAcceptor::start( EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue) { setMaxReadAtOnce(maxAtOnce); queue_.setMaxQueueSize(maxInQueue); if (!eventBase->runInEventBaseThread([=]() { callback_->acceptStarted(); this->startConsuming(eventBase, &queue_); })) { throw std::invalid_argument( "unable to start waiting on accept " "notification queue in the specified " "EventBase thread"); } } void AsyncServerSocket::RemoteAcceptor::stop( EventBase* eventBase, AcceptCallback* callback) { if (!eventBase->runInEventBaseThread([=]() { callback->acceptStopped(); delete this; })) { throw std::invalid_argument( "unable to start waiting on accept " "notification queue in the specified " "EventBase thread"); } } void AsyncServerSocket::RemoteAcceptor::messageAvailable( QueueMessage&& msg) noexcept { switch (msg.type) { case MessageType::MSG_NEW_CONN: { if (connectionEventCallback_) { connectionEventCallback_->onConnectionDequeuedByAcceptorCallback( msg.fd, msg.address); } callback_->connectionAccepted(msg.fd, msg.address); break; } case MessageType::MSG_ERROR: { std::runtime_error ex(msg.msg); callback_->acceptError(ex); break; } default: { LOG(ERROR) << "invalid accept notification message type " << int(msg.type); std::runtime_error ex( "received invalid accept notification message type"); callback_->acceptError(ex); } } } /* * AsyncServerSocket::BackoffTimeout */ class AsyncServerSocket::BackoffTimeout : public AsyncTimeout { public: // Disallow copy, move, and default constructors. BackoffTimeout(BackoffTimeout&&) = delete; explicit BackoffTimeout(AsyncServerSocket* socket) : AsyncTimeout(socket->getEventBase()), socket_(socket) {} void timeoutExpired() noexcept override { socket_->backoffTimeoutExpired(); } private: AsyncServerSocket* socket_; }; /* * AsyncServerSocket methods */ AsyncServerSocket::AsyncServerSocket(EventBase* eventBase) : eventBase_(eventBase), accepting_(false), maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce), maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue), acceptRateAdjustSpeed_(0), acceptRate_(1), lastAccepTimestamp_(std::chrono::steady_clock::now()), numDroppedConnections_(0), callbackIndex_(0), backoffTimeout_(nullptr), callbacks_(), keepAliveEnabled_(true), closeOnExec_(true) { disableTransparentTls(); } void AsyncServerSocket::setShutdownSocketSet( const std::weak_ptr& wNewSS) { const auto newSS = wNewSS.lock(); const auto shutdownSocketSet = wShutdownSocketSet_.lock(); if (shutdownSocketSet == newSS) { return; } if (shutdownSocketSet) { for (auto& h : sockets_) { shutdownSocketSet->remove(h.socket_); } } if (newSS) { for (auto& h : sockets_) { newSS->add(h.socket_); } } wShutdownSocketSet_ = wNewSS; } AsyncServerSocket::~AsyncServerSocket() { assert(callbacks_.empty()); } int AsyncServerSocket::stopAccepting(int shutdownFlags) { int result = 0; for (auto& handler : sockets_) { VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_; } if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } // When destroy is called, unregister and close the socket immediately. accepting_ = false; // Close the sockets in reverse order as they were opened to avoid // the condition where another process concurrently tries to open // the same port, succeed to bind the first socket but fails on the // second because it hasn't been closed yet. for (; !sockets_.empty(); sockets_.pop_back()) { auto& handler = sockets_.back(); handler.unregisterHandler(); if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { shutdownSocketSet->close(handler.socket_); } else if (shutdownFlags >= 0) { result = shutdownNoInt(handler.socket_, shutdownFlags); pendingCloseSockets_.push_back(handler.socket_); } else { closeNoInt(handler.socket_); } } // Destroy the backoff timout. This will cancel it if it is running. delete backoffTimeout_; backoffTimeout_ = nullptr; // Close all of the callback queues to notify them that they are being // destroyed. No one should access the AsyncServerSocket any more once // destroy() is called. However, clear out callbacks_ before invoking the // accept callbacks just in case. This will potentially help us detect the // bug if one of the callbacks calls addAcceptCallback() or // removeAcceptCallback(). std::vector callbacksCopy; callbacks_.swap(callbacksCopy); for (std::vector::iterator it = callbacksCopy.begin(); it != callbacksCopy.end(); ++it) { // consumer may not be set if we are running in primary event base if (it->consumer) { DCHECK(it->eventBase); it->consumer->stop(it->eventBase, it->callback); } else { DCHECK(it->callback); it->callback->acceptStopped(); } } return result; } void AsyncServerSocket::destroy() { stopAccepting(); for (auto s : pendingCloseSockets_) { closeNoInt(s); } // Then call DelayedDestruction::destroy() to take care of // whether or not we need immediate or delayed destruction DelayedDestruction::destroy(); } void AsyncServerSocket::attachEventBase(EventBase* eventBase) { assert(eventBase_ == nullptr); eventBase->dcheckIsInEventBaseThread(); eventBase_ = eventBase; for (auto& handler : sockets_) { handler.attachEventBase(eventBase); } } void AsyncServerSocket::detachEventBase() { assert(eventBase_ != nullptr); eventBase_->dcheckIsInEventBaseThread(); assert(!accepting_); eventBase_ = nullptr; for (auto& handler : sockets_) { handler.detachEventBase(); } } void AsyncServerSocket::useExistingSockets(const std::vector& fds) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } if (sockets_.size() > 0) { throw std::invalid_argument( "cannot call useExistingSocket() on a " "AsyncServerSocket that already has a socket"); } for (auto fd : fds) { // Set addressFamily_ from this socket. // Note that the socket may not have been bound yet, but // setFromLocalAddress() will still work and get the correct address family. // We will update addressFamily_ again anyway if bind() is called later. SocketAddress address; address.setFromLocalAddress(fd); #if __linux__ if (noTransparentTls_) { // Ignore return value, errors are ok setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); } #endif setupSocket(fd, address.getFamily()); sockets_.emplace_back(eventBase_, fd, this, address.getFamily()); sockets_.back().changeHandlerFD(fd); } } void AsyncServerSocket::useExistingSocket(int fd) { useExistingSockets({fd}); } void AsyncServerSocket::bindSocket( int fd, const SocketAddress& address, bool isExistingSocket) { sockaddr_storage addrStorage; address.getAddress(&addrStorage); sockaddr* saddr = reinterpret_cast(&addrStorage); if (fsp::bind(fd, saddr, address.getActualSize()) != 0) { if (!isExistingSocket) { closeNoInt(fd); } folly::throwSystemError( errno, "failed to bind to async server socket: " + address.describe()); } #if __linux__ if (noTransparentTls_) { // Ignore return value, errors are ok setsockopt(fd, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); } #endif // If we just created this socket, update the EventHandler and set socket_ if (!isExistingSocket) { sockets_.emplace_back(eventBase_, fd, this, address.getFamily()); } } bool AsyncServerSocket::setZeroCopy(bool enable) { if (msgErrQueueSupported) { int fd = getSocket(); int val = enable ? 1 : 0; int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val)); return (0 == ret); } return false; } void AsyncServerSocket::bind(const SocketAddress& address) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } // useExistingSocket() may have been called to initialize socket_ already. // However, in the normal case we need to create a new socket now. // Don't set socket_ yet, so that socket_ will remain uninitialized if an // error occurs. int fd; if (sockets_.size() == 0) { fd = createSocket(address.getFamily()); } else if (sockets_.size() == 1) { if (address.getFamily() != sockets_[0].addressFamily_) { throw std::invalid_argument( "Attempted to bind address to socket with " "different address family"); } fd = sockets_[0].socket_; } else { throw std::invalid_argument("Attempted to bind to multiple fds"); } bindSocket(fd, address, !sockets_.empty()); } void AsyncServerSocket::bind( const std::vector& ipAddresses, uint16_t port) { if (ipAddresses.empty()) { throw std::invalid_argument("No ip addresses were provided"); } if (!sockets_.empty()) { throw std::invalid_argument( "Cannot call bind on a AsyncServerSocket " "that already has a socket."); } for (const IPAddress& ipAddress : ipAddresses) { SocketAddress address(ipAddress.toFullyQualified(), port); int fd = createSocket(address.getFamily()); bindSocket(fd, address, false); } if (sockets_.size() == 0) { throw std::runtime_error( "did not bind any async server socket for port and addresses"); } } void AsyncServerSocket::bind(uint16_t port) { struct addrinfo hints, *res0; char sport[sizeof("65536")]; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV; snprintf(sport, sizeof(sport), "%u", port); // On Windows the value we need to pass to bind to all available // addresses is an empty string. Everywhere else, it's nullptr. constexpr const char* kWildcardNode = kIsWindows ? "" : nullptr; if (getaddrinfo(kWildcardNode, sport, &hints, &res0)) { throw std::invalid_argument( "Attempted to bind address to socket with " "bad getaddrinfo"); } SCOPE_EXIT { freeaddrinfo(res0); }; auto setupAddress = [&](struct addrinfo* res) { int s = fsp::socket(res->ai_family, res->ai_socktype, res->ai_protocol); // IPv6/IPv4 may not be supported by the kernel if (s < 0 && errno == EAFNOSUPPORT) { return; } CHECK_GE(s, 0); try { setupSocket(s, res->ai_family); } catch (...) { closeNoInt(s); throw; } if (res->ai_family == AF_INET6) { int v6only = 1; CHECK( 0 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only))); } // Bind to the socket if (fsp::bind(s, res->ai_addr, socklen_t(res->ai_addrlen)) != 0) { folly::throwSystemError( errno, "failed to bind to async server socket for port ", SocketAddress::getPortFrom(res->ai_addr), " family ", SocketAddress::getFamilyNameFrom(res->ai_addr, "")); } #if __linux__ if (noTransparentTls_) { // Ignore return value, errors are ok setsockopt(s, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0); } #endif SocketAddress address; address.setFromLocalAddress(s); sockets_.emplace_back(eventBase_, s, this, address.getFamily()); }; const int kNumTries = 25; for (int tries = 1; true; tries++) { // Prefer AF_INET6 addresses. RFC 3484 mandates that getaddrinfo // should return IPv6 first and then IPv4 addresses, but glibc's // getaddrinfo(nullptr) with AI_PASSIVE returns: // - 0.0.0.0 (IPv4-only) // - :: (IPv6+IPv4) in this order // See: https://sourceware.org/bugzilla/show_bug.cgi?id=9981 for (struct addrinfo* res = res0; res; res = res->ai_next) { if (res->ai_family == AF_INET6) { setupAddress(res); } } // If port == 0, then we should try to bind to the same port on ipv4 and // ipv6. So if we did bind to ipv6, figure out that port and use it. if (sockets_.size() == 1 && port == 0) { SocketAddress address; address.setFromLocalAddress(sockets_.back().socket_); snprintf(sport, sizeof(sport), "%u", address.getPort()); freeaddrinfo(res0); CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0)); } try { for (struct addrinfo* res = res0; res; res = res->ai_next) { if (res->ai_family != AF_INET6) { setupAddress(res); } } } catch (const std::system_error&) { // If we can't bind to the same port on ipv4 as ipv6 when using // port=0 then we will retry again before giving up after // kNumTries attempts. We do this by closing the sockets that // were opened, then restarting from scratch. if (port == 0 && !sockets_.empty() && tries != kNumTries) { for (const auto& socket : sockets_) { if (socket.socket_ <= 0) { continue; } else if ( const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { shutdownSocketSet->close(socket.socket_); } else { closeNoInt(socket.socket_); } } sockets_.clear(); snprintf(sport, sizeof(sport), "%u", port); freeaddrinfo(res0); CHECK_EQ(0, getaddrinfo(nullptr, sport, &hints, &res0)); continue; } throw; } break; } if (sockets_.size() == 0) { throw std::runtime_error("did not bind any async server socket for port"); } } void AsyncServerSocket::listen(int backlog) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } // Start listening for (auto& handler : sockets_) { if (fsp::listen(handler.socket_, backlog) == -1) { folly::throwSystemError(errno, "failed to listen on async server socket"); } } } void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const { CHECK(sockets_.size() >= 1); VLOG_IF(2, sockets_.size() > 1) << "Warning: getAddress() called and multiple addresses available (" << sockets_.size() << "). Returning only the first one."; addressReturn->setFromLocalAddress(sockets_[0].socket_); } std::vector AsyncServerSocket::getAddresses() const { CHECK(sockets_.size() >= 1); auto tsaVec = std::vector(sockets_.size()); auto tsaIter = tsaVec.begin(); for (const auto& socket : sockets_) { (tsaIter++)->setFromLocalAddress(socket.socket_); }; return tsaVec; } void AsyncServerSocket::addAcceptCallback( AcceptCallback* callback, EventBase* eventBase, uint32_t maxAtOnce) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } // If this is the first accept callback and we are supposed to be accepting, // start accepting once the callback is installed. bool runStartAccepting = accepting_ && callbacks_.empty(); callbacks_.emplace_back(callback, eventBase); SCOPE_SUCCESS { // If this is the first accept callback and we are supposed to be accepting, // start accepting. if (runStartAccepting) { startAccepting(); } }; if (!eventBase) { // Run in AsyncServerSocket's eventbase; notify that we are // starting to accept connections callback->acceptStarted(); return; } // Start the remote acceptor. // // It would be nice if we could avoid starting the remote acceptor if // eventBase == eventBase_. However, that would cause issues if // detachEventBase() and attachEventBase() were ever used to change the // primary EventBase for the server socket. Therefore we require the caller // to specify a nullptr EventBase if they want to ensure that the callback is // always invoked in the primary EventBase, and to be able to invoke that // callback more efficiently without having to use a notification queue. RemoteAcceptor* acceptor = nullptr; try { acceptor = new RemoteAcceptor(callback, connectionEventCallback_); acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_); } catch (...) { callbacks_.pop_back(); delete acceptor; throw; } callbacks_.back().consumer = acceptor; } void AsyncServerSocket::removeAcceptCallback( AcceptCallback* callback, EventBase* eventBase) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } // Find the matching AcceptCallback. // We just do a simple linear search; we don't expect removeAcceptCallback() // to be called frequently, and we expect there to only be a small number of // callbacks anyway. std::vector::iterator it = callbacks_.begin(); uint32_t n = 0; while (true) { if (it == callbacks_.end()) { throw std::runtime_error( "AsyncServerSocket::removeAcceptCallback(): " "accept callback not found"); } if (it->callback == callback && (it->eventBase == eventBase || eventBase == nullptr)) { break; } ++it; ++n; } // Remove this callback from callbacks_. // // Do this before invoking the acceptStopped() callback, in case // acceptStopped() invokes one of our methods that examines callbacks_. // // Save a copy of the CallbackInfo first. CallbackInfo info(*it); callbacks_.erase(it); if (n < callbackIndex_) { // We removed an element before callbackIndex_. Move callbackIndex_ back // one step, since things after n have been shifted back by 1. --callbackIndex_; } else { // We removed something at or after callbackIndex_. // If we removed the last element and callbackIndex_ was pointing at it, // we need to reset callbackIndex_ to 0. if (callbackIndex_ >= callbacks_.size()) { callbackIndex_ = 0; } } if (info.consumer) { // consumer could be nullptr is we run callbacks in primary event // base DCHECK(info.eventBase); info.consumer->stop(info.eventBase, info.callback); } else { // callback invoked in the primary event base, just call directly DCHECK(info.callback); callback->acceptStopped(); } // If we are supposed to be accepting but the last accept callback // was removed, unregister for events until a callback is added. if (accepting_ && callbacks_.empty()) { for (auto& handler : sockets_) { handler.unregisterHandler(); } } } void AsyncServerSocket::startAccepting() { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } accepting_ = true; if (callbacks_.empty()) { // We can't actually begin accepting if no callbacks are defined. // Wait until a callback is added to start accepting. return; } for (auto& handler : sockets_) { if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) { throw std::runtime_error("failed to register for accept events"); } } } void AsyncServerSocket::pauseAccepting() { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); } accepting_ = false; for (auto& handler : sockets_) { handler.unregisterHandler(); } // If we were in the accept backoff state, disable the backoff timeout if (backoffTimeout_) { backoffTimeout_->cancelTimeout(); } } int AsyncServerSocket::createSocket(int family) { int fd = fsp::socket(family, SOCK_STREAM, 0); if (fd == -1) { folly::throwSystemError(errno, "error creating async server socket"); } try { setupSocket(fd, family); } catch (...) { closeNoInt(fd); throw; } return fd; } /** * Enable/Disable TOS reflection for the server socket * If enabled, the 'accepted' connections will reflect the * TOS derived from the client's connect request */ void AsyncServerSocket::setTosReflect(bool enable) { if (!kIsLinux || enable == false) { tosReflect_ = false; return; } for (auto& handler : sockets_) { if (handler.socket_ < 0) { continue; } int val = (enable) ? 1 : 0; int ret = setsockopt( handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val)); if (ret == 0) { VLOG(10) << "Enabled SYN save for socket " << handler.socket_; } else { folly::throwSystemError(errno, "failed to enable TOS reflect"); } } tosReflect_ = true; } void AsyncServerSocket::setupSocket(int fd, int family) { // Put the socket in non-blocking mode if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) { folly::throwSystemError(errno, "failed to put socket in non-blocking mode"); } // Set reuseaddr to avoid 2MSL delay on server restart int one = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) { // This isn't a fatal error; just log an error message and continue LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno; } // Set reuseport to support multiple accept threads int zero = 0; if (reusePortEnabled_ && setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(int)) != 0) { LOG(ERROR) << "failed to set SO_REUSEPORT on async server socket " << errnoStr(errno); #ifdef WIN32 folly::throwSystemError(errno, "failed to bind to the async server socket"); #else SocketAddress address; address.setFromLocalAddress(fd); folly::throwSystemError( errno, "failed to bind to async server socket: " + address.describe()); #endif } // Set keepalive as desired if (setsockopt( fd, SOL_SOCKET, SO_KEEPALIVE, (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) { LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " << errnoStr(errno); } // Setup FD_CLOEXEC flag if (closeOnExec_ && (-1 == folly::setCloseOnExec(fd, closeOnExec_))) { LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " << errnoStr(errno); } // Set TCP nodelay if available, MAC OS X Hack // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html #ifndef TCP_NOPUSH if (family != AF_UNIX) { if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) { // This isn't a fatal error; just log an error message and continue LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " << errnoStr(errno); } } #else (void)family; // to avoid unused parameter warning #endif #if FOLLY_ALLOW_TFO if (tfo_ && detail::tfo_enable(fd, tfoMaxQueueSize_) != 0) { // This isn't a fatal error; just log an error message and continue LOG(WARNING) << "failed to set TCP_FASTOPEN on async server socket: " << folly::errnoStr(errno); } #endif if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { shutdownSocketSet->add(fd); } } void AsyncServerSocket::handlerReady( uint16_t /* events */, int fd, sa_family_t addressFamily) noexcept { assert(!callbacks_.empty()); DestructorGuard dg(this); // Only accept up to maxAcceptAtOnce_ connections at a time, // to avoid starving other I/O handlers using this EventBase. for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) { SocketAddress address; sockaddr_storage addrStorage; socklen_t addrLen = sizeof(addrStorage); sockaddr* saddr = reinterpret_cast(&addrStorage); // In some cases, accept() doesn't seem to update these correctly. saddr->sa_family = addressFamily; if (addressFamily == AF_UNIX) { addrLen = sizeof(struct sockaddr_un); } // Accept a new client socket #ifdef SOCK_NONBLOCK int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK); #else int clientSocket = accept(fd, saddr, &addrLen); #endif address.setFromSockaddr(saddr, addrLen); if (clientSocket >= 0 && connectionEventCallback_) { connectionEventCallback_->onConnectionAccepted(clientSocket, address); } // Connection accepted, get the SYN packet from the client if // TOS reflect is enabled if (kIsLinux && clientSocket >= 0 && tosReflect_) { std::array buffer; socklen_t len = sizeof(buffer); int ret = getsockopt(clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len); if (ret == 0) { uint32_t tosWord = folly::Endian::big(buffer[0]); if (addressFamily == AF_INET6) { tosWord = (tosWord & 0x0FC00000) >> 20; ret = setsockopt( clientSocket, IPPROTO_IPV6, IPV6_TCLASS, &tosWord, sizeof(tosWord)); } else if (addressFamily == AF_INET) { tosWord = (tosWord & 0x00FC0000) >> 16; ret = setsockopt( clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord)); } if (ret != 0) { LOG(ERROR) << "Unable to set TOS for accepted socket " << clientSocket; } } else { LOG(ERROR) << "Unable to get SYN packet for accepted socket " << clientSocket; } } std::chrono::time_point nowMs = std::chrono::steady_clock::now(); auto timeSinceLastAccept = std::max( 0, nowMs.time_since_epoch().count() - lastAccepTimestamp_.time_since_epoch().count()); lastAccepTimestamp_ = nowMs; if (acceptRate_ < 1) { acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept; if (acceptRate_ >= 1) { acceptRate_ = 1; } else if (rand() > acceptRate_ * RAND_MAX) { ++numDroppedConnections_; if (clientSocket >= 0) { closeNoInt(clientSocket); if (connectionEventCallback_) { connectionEventCallback_->onConnectionDropped( clientSocket, address); } } continue; } } if (clientSocket < 0) { if (errno == EAGAIN) { // No more sockets to accept right now. // Check for this code first, since it's the most common. return; } else if (errno == EMFILE || errno == ENFILE) { // We're out of file descriptors. Perhaps we're accepting connections // too quickly. Pause accepting briefly to back off and give the server // a chance to recover. LOG(ERROR) << "accept failed: out of file descriptors; entering accept " "back-off state"; enterBackoff(); // Dispatch the error message dispatchError("accept() failed", errno); } else { dispatchError("accept() failed", errno); } if (connectionEventCallback_) { connectionEventCallback_->onConnectionAcceptError(errno); } return; } #ifndef SOCK_NONBLOCK // Explicitly set the new connection to non-blocking mode if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) { closeNoInt(clientSocket); dispatchError( "failed to set accepted socket to non-blocking mode", errno); if (connectionEventCallback_) { connectionEventCallback_->onConnectionDropped(clientSocket, address); } return; } #endif // Inform the callback about the new connection dispatchSocket(clientSocket, std::move(address)); // If we aren't accepting any more, break out of the loop if (!accepting_ || callbacks_.empty()) { break; } } } void AsyncServerSocket::dispatchSocket(int socket, SocketAddress&& address) { uint32_t startingIndex = callbackIndex_; // Short circuit if the callback is in the primary EventBase thread CallbackInfo* info = nextCallback(); if (info->eventBase == nullptr || info->eventBase == this->eventBase_) { info->callback->connectionAccepted(socket, address); return; } const SocketAddress addr(address); // Create a message to send over the notification queue QueueMessage msg; msg.type = MessageType::MSG_NEW_CONN; msg.address = std::move(address); msg.fd = socket; // Loop until we find a free queue to write to while (true) { if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) { if (connectionEventCallback_) { connectionEventCallback_->onConnectionEnqueuedForAcceptorCallback( socket, addr); } // Success! return. return; } // We couldn't add to queue. Fall through to below ++numDroppedConnections_; if (acceptRateAdjustSpeed_ > 0) { // aggressively decrease accept rate when in trouble static const double kAcceptRateDecreaseSpeed = 0.1; acceptRate_ *= 1 - kAcceptRateDecreaseSpeed; } if (callbackIndex_ == startingIndex) { // The notification queue was full // We can't really do anything at this point other than close the socket. // // This should only happen if a user's service is behaving extremely // badly and none of the EventBase threads are looping fast enough to // process the incoming connections. If the service is overloaded, it // should use pauseAccepting() to temporarily back off accepting new // connections, before they reach the point where their threads can't // even accept new messages. LOG_EVERY_N(ERROR, 100) << "failed to dispatch newly accepted socket:" << " all accept callback queues are full"; closeNoInt(socket); if (connectionEventCallback_) { connectionEventCallback_->onConnectionDropped(socket, addr); } return; } info = nextCallback(); } } void AsyncServerSocket::dispatchError(const char* msgstr, int errnoValue) { uint32_t startingIndex = callbackIndex_; CallbackInfo* info = nextCallback(); // Create a message to send over the notification queue QueueMessage msg; msg.type = MessageType::MSG_ERROR; msg.err = errnoValue; msg.msg = std::move(msgstr); while (true) { // Short circuit if the callback is in the primary EventBase thread if (info->eventBase == nullptr || info->eventBase == this->eventBase_) { std::runtime_error ex( std::string(msgstr) + folly::to(errnoValue)); info->callback->acceptError(ex); return; } if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) { return; } // Fall through and try another callback if (callbackIndex_ == startingIndex) { // The notification queues for all of the callbacks were full. // We can't really do anything at this point. LOG_EVERY_N(ERROR, 100) << "failed to dispatch accept error: all accept" << " callback queues are full: error msg: " << msg.msg << ": " << errnoValue; return; } info = nextCallback(); } } void AsyncServerSocket::enterBackoff() { // If this is the first time we have entered the backoff state, // allocate backoffTimeout_. if (backoffTimeout_ == nullptr) { try { backoffTimeout_ = new BackoffTimeout(this); } catch (const std::bad_alloc&) { // Man, we couldn't even allocate the timer to re-enable accepts. // We must be in pretty bad shape. Don't pause accepting for now, // since we won't be able to re-enable ourselves later. LOG(ERROR) << "failed to allocate AsyncServerSocket backoff" << " timer; unable to temporarly pause accepting"; if (connectionEventCallback_) { connectionEventCallback_->onBackoffError(); } return; } } // For now, we simply pause accepting for 1 second. // // We could add some smarter backoff calculation here in the future. (e.g., // start sleeping for longer if we keep hitting the backoff frequently.) // Typically the user needs to figure out why the server is overloaded and // fix it in some other way, though. The backoff timer is just a simple // mechanism to try and give the connection processing code a little bit of // breathing room to catch up, and to avoid just spinning and failing to // accept over and over again. const uint32_t timeoutMS = 1000; if (!backoffTimeout_->scheduleTimeout(timeoutMS)) { LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;" << "unable to temporarly pause accepting"; if (connectionEventCallback_) { connectionEventCallback_->onBackoffError(); } return; } // The backoff timer is scheduled to re-enable accepts. // Go ahead and disable accepts for now. We leave accepting_ set to true, // since that tracks the desired state requested by the user. for (auto& handler : sockets_) { handler.unregisterHandler(); } if (connectionEventCallback_) { connectionEventCallback_->onBackoffStarted(); } } void AsyncServerSocket::backoffTimeoutExpired() { // accepting_ should still be true. // If pauseAccepting() was called while in the backoff state it will cancel // the backoff timeout. assert(accepting_); // We can't be detached from the EventBase without being paused assert(eventBase_ != nullptr); eventBase_->dcheckIsInEventBaseThread(); // If all of the callbacks were removed, we shouldn't re-enable accepts if (callbacks_.empty()) { if (connectionEventCallback_) { connectionEventCallback_->onBackoffEnded(); } return; } // Register the handler. for (auto& handler : sockets_) { if (!handler.registerHandler(EventHandler::READ | EventHandler::PERSIST)) { // We're hosed. We could just re-schedule backoffTimeout_ to // re-try again after a little bit. However, we don't want to // loop retrying forever if we can't re-enable accepts. Just // abort the entire program in this state; things are really bad // and restarting the entire server is probably the best remedy. LOG(ERROR) << "failed to re-enable AsyncServerSocket accepts after backoff; " << "crashing now"; abort(); } } if (connectionEventCallback_) { connectionEventCallback_->onBackoffEnded(); } } } // namespace folly