AsyncIO.h 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <sys/types.h>
  18. #include <atomic>
  19. #include <cstdint>
  20. #include <deque>
  21. #include <functional>
  22. #include <iosfwd>
  23. #include <mutex>
  24. #include <utility>
  25. #include <vector>
  26. #include <boost/noncopyable.hpp>
  27. #include <libaio.h>
  28. #include <folly/Portability.h>
  29. #include <folly/Range.h>
  30. #include <folly/portability/SysUio.h>
  31. namespace folly {
  32. /**
  33. * An AsyncIOOp represents a pending operation. You may set a notification
  34. * callback or you may use this class's methods directly.
  35. *
  36. * The op must remain allocated until it is completed or canceled.
  37. */
  38. class AsyncIOOp : private boost::noncopyable {
  39. friend class AsyncIO;
  40. friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
  41. public:
  42. typedef std::function<void(AsyncIOOp*)> NotificationCallback;
  43. explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
  44. ~AsyncIOOp();
  45. enum class State {
  46. UNINITIALIZED,
  47. INITIALIZED,
  48. PENDING,
  49. COMPLETED,
  50. CANCELED,
  51. };
  52. /**
  53. * Initiate a read request.
  54. */
  55. void pread(int fd, void* buf, size_t size, off_t start);
  56. void pread(int fd, Range<unsigned char*> range, off_t start);
  57. void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
  58. /**
  59. * Initiate a write request.
  60. */
  61. void pwrite(int fd, const void* buf, size_t size, off_t start);
  62. void pwrite(int fd, Range<const unsigned char*> range, off_t start);
  63. void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
  64. /**
  65. * Return the current operation state.
  66. */
  67. State state() const {
  68. return state_;
  69. }
  70. /**
  71. * Reset the operation for reuse. It is an error to call reset() on
  72. * an Op that is still pending.
  73. */
  74. void reset(NotificationCallback cb = NotificationCallback());
  75. void setNotificationCallback(NotificationCallback cb) {
  76. cb_ = std::move(cb);
  77. }
  78. const NotificationCallback& notificationCallback() const {
  79. return cb_;
  80. }
  81. /**
  82. * Retrieve the result of this operation. Returns >=0 on success,
  83. * -errno on failure (that is, using the Linux kernel error reporting
  84. * conventions). Use checkKernelError (folly/Exception.h) on the result to
  85. * throw a std::system_error in case of error instead.
  86. *
  87. * It is an error to call this if the Op hasn't completed.
  88. */
  89. ssize_t result() const;
  90. private:
  91. void init();
  92. void start();
  93. void complete(ssize_t result);
  94. void cancel();
  95. NotificationCallback cb_;
  96. iocb iocb_;
  97. State state_;
  98. ssize_t result_;
  99. };
  100. std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
  101. std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
  102. /**
  103. * C++ interface around Linux Async IO.
  104. */
  105. class AsyncIO : private boost::noncopyable {
  106. public:
  107. typedef AsyncIOOp Op;
  108. enum PollMode {
  109. NOT_POLLABLE,
  110. POLLABLE,
  111. };
  112. /**
  113. * Create an AsyncIO context capable of holding at most 'capacity' pending
  114. * requests at the same time. As requests complete, others can be scheduled,
  115. * as long as this limit is not exceeded.
  116. *
  117. * Note: the maximum number of allowed concurrent requests is controlled
  118. * by the fs.aio-max-nr sysctl, the default value is usually 64K.
  119. *
  120. * If pollMode is POLLABLE, pollFd() will return a file descriptor that
  121. * can be passed to poll / epoll / select and will become readable when
  122. * any IOs on this AsyncIO have completed. If you do this, you must use
  123. * pollCompleted() instead of wait() -- do not read from the pollFd()
  124. * file descriptor directly.
  125. *
  126. * You may use the same AsyncIO object from multiple threads, as long as
  127. * there is only one concurrent caller of wait() / pollCompleted() / cancel()
  128. * (perhaps by always calling it from the same thread, or by providing
  129. * appropriate mutual exclusion). In this case, pending() returns a snapshot
  130. * of the current number of pending requests.
  131. */
  132. explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
  133. ~AsyncIO();
  134. /**
  135. * Wait for at least minRequests to complete. Returns the requests that
  136. * have completed; the returned range is valid until the next call to
  137. * wait(). minRequests may be 0 to not block.
  138. */
  139. Range<Op**> wait(size_t minRequests);
  140. /**
  141. * Cancel all pending requests and return them; the returned range is
  142. * valid until the next call to cancel().
  143. */
  144. Range<Op**> cancel();
  145. /**
  146. * Return the number of pending requests.
  147. */
  148. size_t pending() const {
  149. return pending_;
  150. }
  151. /**
  152. * Return the maximum number of requests that can be kept outstanding
  153. * at any one time.
  154. */
  155. size_t capacity() const {
  156. return capacity_;
  157. }
  158. /**
  159. * Return the accumulative number of submitted I/O, since this object
  160. * has been created.
  161. */
  162. size_t totalSubmits() const {
  163. return submitted_;
  164. }
  165. /**
  166. * If POLLABLE, return a file descriptor that can be passed to poll / epoll
  167. * and will become readable when any async IO operations have completed.
  168. * If NOT_POLLABLE, return -1.
  169. */
  170. int pollFd() const {
  171. return pollFd_;
  172. }
  173. /**
  174. * If POLLABLE, call instead of wait after the file descriptor returned
  175. * by pollFd() became readable. The returned range is valid until the next
  176. * call to pollCompleted().
  177. */
  178. Range<Op**> pollCompleted();
  179. /**
  180. * Submit an op for execution.
  181. */
  182. void submit(Op* op);
  183. private:
  184. void decrementPending();
  185. void initializeContext();
  186. enum class WaitType { COMPLETE, CANCEL };
  187. Range<AsyncIO::Op**> doWait(
  188. WaitType type,
  189. size_t minRequests,
  190. size_t maxRequests,
  191. std::vector<Op*>& result);
  192. io_context_t ctx_{nullptr};
  193. std::atomic<bool> ctxSet_{false};
  194. std::mutex initMutex_;
  195. std::atomic<size_t> pending_{0};
  196. std::atomic<size_t> submitted_{0};
  197. const size_t capacity_;
  198. int pollFd_{-1};
  199. std::vector<Op*> completed_;
  200. std::vector<Op*> canceled_;
  201. };
  202. /**
  203. * Wrapper around AsyncIO that allows you to schedule more requests than
  204. * the AsyncIO's object capacity. Other requests are queued and processed
  205. * in a FIFO order.
  206. */
  207. class AsyncIOQueue {
  208. public:
  209. /**
  210. * Create a queue, using the given AsyncIO object.
  211. * The AsyncIO object may not be used by anything else until the
  212. * queue is destroyed.
  213. */
  214. explicit AsyncIOQueue(AsyncIO* asyncIO);
  215. ~AsyncIOQueue();
  216. size_t queued() const {
  217. return queue_.size();
  218. }
  219. /**
  220. * Submit an op to the AsyncIO queue. The op will be queued until
  221. * the AsyncIO object has room.
  222. */
  223. void submit(AsyncIOOp* op);
  224. /**
  225. * Submit a delayed op to the AsyncIO queue; this allows you to postpone
  226. * creation of the Op (which may require allocating memory, etc) until
  227. * the AsyncIO object has room.
  228. */
  229. typedef std::function<AsyncIOOp*()> OpFactory;
  230. void submit(OpFactory op);
  231. private:
  232. void onCompleted(AsyncIOOp* op);
  233. void maybeDequeue();
  234. AsyncIO* asyncIO_;
  235. std::deque<OpFactory> queue_;
  236. };
  237. } // namespace folly