AsyncUDPServerSocket.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <folly/Memory.h>
  18. #include <folly/io/IOBufQueue.h>
  19. #include <folly/io/async/AsyncUDPSocket.h>
  20. #include <folly/io/async/EventBase.h>
  21. namespace folly {
  22. /**
  23. * UDP server socket
  24. *
  25. * It wraps a UDP socket waiting for packets and distributes them among
  26. * a set of event loops in round robin fashion.
  27. *
  28. * NOTE: At the moment it is designed to work with single packet protocols
  29. * in mind. We distribute incoming packets among all the listeners in
  30. * round-robin fashion. So, any protocol that expects to send/recv
  31. * more than 1 packet will not work because they will end up with
  32. * different event base to process.
  33. */
  34. class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
  35. public AsyncSocketBase {
  36. public:
  37. class Callback {
  38. public:
  39. /**
  40. * Invoked when we start reading data from socket. It is invoked in
  41. * each acceptors/listeners event base thread.
  42. */
  43. virtual void onListenStarted() noexcept = 0;
  44. /**
  45. * Invoked when the server socket is closed. It is invoked in each
  46. * acceptors/listeners event base thread.
  47. */
  48. virtual void onListenStopped() noexcept = 0;
  49. /**
  50. * Invoked when the server socket is paused. It is invoked in each
  51. * acceptors/listeners event base thread.
  52. */
  53. virtual void onListenPaused() noexcept {}
  54. /**
  55. * Invoked when the server socket is resumed. It is invoked in each
  56. * acceptors/listeners event base thread.
  57. */
  58. virtual void onListenResumed() noexcept {}
  59. /**
  60. * Invoked when a new packet is received
  61. */
  62. virtual void onDataAvailable(
  63. std::shared_ptr<AsyncUDPSocket> socket,
  64. const folly::SocketAddress& addr,
  65. std::unique_ptr<folly::IOBuf> buf,
  66. bool truncated) noexcept = 0;
  67. virtual ~Callback() = default;
  68. };
  69. /**
  70. * Create a new UDP server socket
  71. *
  72. * Note about packet size - We allocate buffer of packetSize_ size to read.
  73. * If packet are larger than this value, as per UDP protocol, remaining data
  74. * is dropped and you get `truncated = true` in onDataAvailable callback
  75. */
  76. explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
  77. : evb_(evb), packetSize_(sz), nextListener_(0) {}
  78. ~AsyncUDPServerSocket() override {
  79. if (socket_) {
  80. close();
  81. }
  82. }
  83. void bind(const folly::SocketAddress& addy) {
  84. CHECK(!socket_);
  85. socket_ = std::make_shared<AsyncUDPSocket>(evb_);
  86. socket_->setReusePort(reusePort_);
  87. socket_->bind(addy);
  88. }
  89. void setReusePort(bool reusePort) {
  90. reusePort_ = reusePort;
  91. }
  92. folly::SocketAddress address() const {
  93. CHECK(socket_);
  94. return socket_->address();
  95. }
  96. void getAddress(SocketAddress* a) const override {
  97. *a = address();
  98. }
  99. /**
  100. * Add a listener to the round robin list
  101. */
  102. void addListener(EventBase* evb, Callback* callback) {
  103. listeners_.emplace_back(evb, callback);
  104. }
  105. void listen() {
  106. CHECK(socket_) << "Need to bind before listening";
  107. for (auto& listener : listeners_) {
  108. auto callback = listener.second;
  109. listener.first->runInEventBaseThread(
  110. [callback]() mutable { callback->onListenStarted(); });
  111. }
  112. socket_->resumeRead(this);
  113. }
  114. int getFD() const {
  115. CHECK(socket_) << "Need to bind before getting FD";
  116. return socket_->getFD();
  117. }
  118. void close() {
  119. CHECK(socket_) << "Need to bind before closing";
  120. socket_->close();
  121. socket_.reset();
  122. }
  123. EventBase* getEventBase() const override {
  124. return evb_;
  125. }
  126. /**
  127. * Pauses accepting datagrams on the underlying socket.
  128. */
  129. void pauseAccepting() {
  130. socket_->pauseRead();
  131. for (auto& listener : listeners_) {
  132. auto callback = listener.second;
  133. listener.first->runInEventBaseThread(
  134. [callback]() mutable { callback->onListenPaused(); });
  135. }
  136. }
  137. /**
  138. * Starts accepting datagrams once again.
  139. */
  140. void resumeAccepting() {
  141. socket_->resumeRead(this);
  142. for (auto& listener : listeners_) {
  143. auto callback = listener.second;
  144. listener.first->runInEventBaseThread(
  145. [callback]() mutable { callback->onListenResumed(); });
  146. }
  147. }
  148. private:
  149. // AsyncUDPSocket::ReadCallback
  150. void getReadBuffer(void** buf, size_t* len) noexcept override {
  151. std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
  152. }
  153. void onDataAvailable(
  154. const folly::SocketAddress& clientAddress,
  155. size_t len,
  156. bool truncated) noexcept override {
  157. buf_.postallocate(len);
  158. auto data = buf_.split(len);
  159. if (listeners_.empty()) {
  160. LOG(WARNING) << "UDP server socket dropping packet, "
  161. << "no listener registered";
  162. return;
  163. }
  164. if (nextListener_ >= listeners_.size()) {
  165. nextListener_ = 0;
  166. }
  167. auto client = clientAddress;
  168. auto callback = listeners_[nextListener_].second;
  169. auto socket = socket_;
  170. // Schedule it in the listener's eventbase
  171. // XXX: Speed this up
  172. auto f = [socket,
  173. client,
  174. callback,
  175. data = std::move(data),
  176. truncated]() mutable {
  177. callback->onDataAvailable(socket, client, std::move(data), truncated);
  178. };
  179. listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
  180. ++nextListener_;
  181. }
  182. void onReadError(const AsyncSocketException& ex) noexcept override {
  183. LOG(ERROR) << ex.what();
  184. // Lets register to continue listening for packets
  185. socket_->resumeRead(this);
  186. }
  187. void onReadClosed() noexcept override {
  188. for (auto& listener : listeners_) {
  189. auto callback = listener.second;
  190. listener.first->runInEventBaseThread(
  191. [callback]() mutable { callback->onListenStopped(); });
  192. }
  193. }
  194. EventBase* const evb_;
  195. const size_t packetSize_;
  196. std::shared_ptr<AsyncUDPSocket> socket_;
  197. // List of listener to distribute packets among
  198. typedef std::pair<EventBase*, Callback*> Listener;
  199. std::vector<Listener> listeners_;
  200. // Next listener to send packet to
  201. uint32_t nextListener_;
  202. // Temporary buffer for data
  203. folly::IOBufQueue buf_;
  204. bool reusePort_{false};
  205. };
  206. } // namespace folly