AsyncIO.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/experimental/io/AsyncIO.h>
  17. #include <sys/eventfd.h>
  18. #include <cerrno>
  19. #include <ostream>
  20. #include <stdexcept>
  21. #include <string>
  22. #include <boost/intrusive/parent_from_member.hpp>
  23. #include <glog/logging.h>
  24. #include <folly/Exception.h>
  25. #include <folly/Format.h>
  26. #include <folly/Likely.h>
  27. #include <folly/String.h>
  28. #include <folly/portability/Unistd.h>
  29. namespace folly {
  30. AsyncIOOp::AsyncIOOp(NotificationCallback cb)
  31. : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
  32. memset(&iocb_, 0, sizeof(iocb_));
  33. }
  34. void AsyncIOOp::reset(NotificationCallback cb) {
  35. CHECK_NE(state_, State::PENDING);
  36. cb_ = std::move(cb);
  37. state_ = State::UNINITIALIZED;
  38. result_ = -EINVAL;
  39. memset(&iocb_, 0, sizeof(iocb_));
  40. }
  41. AsyncIOOp::~AsyncIOOp() {
  42. CHECK_NE(state_, State::PENDING);
  43. }
  44. void AsyncIOOp::start() {
  45. DCHECK_EQ(state_, State::INITIALIZED);
  46. state_ = State::PENDING;
  47. }
  48. void AsyncIOOp::complete(ssize_t result) {
  49. DCHECK_EQ(state_, State::PENDING);
  50. state_ = State::COMPLETED;
  51. result_ = result;
  52. if (cb_) {
  53. cb_(this);
  54. }
  55. }
  56. void AsyncIOOp::cancel() {
  57. DCHECK_EQ(state_, State::PENDING);
  58. state_ = State::CANCELED;
  59. }
  60. ssize_t AsyncIOOp::result() const {
  61. CHECK_EQ(state_, State::COMPLETED);
  62. return result_;
  63. }
  64. void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
  65. init();
  66. io_prep_pread(&iocb_, fd, buf, size, start);
  67. }
  68. void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
  69. pread(fd, range.begin(), range.size(), start);
  70. }
  71. void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
  72. init();
  73. io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
  74. }
  75. void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
  76. init();
  77. io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
  78. }
  79. void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
  80. pwrite(fd, range.begin(), range.size(), start);
  81. }
  82. void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
  83. init();
  84. io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
  85. }
  86. void AsyncIOOp::init() {
  87. CHECK_EQ(state_, State::UNINITIALIZED);
  88. state_ = State::INITIALIZED;
  89. }
  90. AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
  91. CHECK_GT(capacity_, 0);
  92. completed_.reserve(capacity_);
  93. if (pollMode == POLLABLE) {
  94. pollFd_ = eventfd(0, EFD_NONBLOCK);
  95. checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
  96. }
  97. }
  98. AsyncIO::~AsyncIO() {
  99. CHECK_EQ(pending_, 0);
  100. if (ctx_) {
  101. int rc = io_queue_release(ctx_);
  102. CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
  103. }
  104. if (pollFd_ != -1) {
  105. CHECK_ERR(close(pollFd_));
  106. }
  107. }
  108. void AsyncIO::decrementPending() {
  109. auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
  110. DCHECK_GE(p, 1);
  111. }
  112. void AsyncIO::initializeContext() {
  113. if (!ctxSet_.load(std::memory_order_acquire)) {
  114. std::lock_guard<std::mutex> lock(initMutex_);
  115. if (!ctxSet_.load(std::memory_order_relaxed)) {
  116. int rc = io_queue_init(capacity_, &ctx_);
  117. // returns negative errno
  118. if (rc == -EAGAIN) {
  119. long aio_nr, aio_max;
  120. std::unique_ptr<FILE, int (*)(FILE*)> fp(
  121. fopen("/proc/sys/fs/aio-nr", "r"), fclose);
  122. PCHECK(fp);
  123. CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
  124. std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
  125. fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
  126. PCHECK(aio_max_fp);
  127. CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
  128. LOG(ERROR) << "No resources for requested capacity of " << capacity_;
  129. LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
  130. }
  131. checkKernelError(rc, "AsyncIO: io_queue_init failed");
  132. DCHECK(ctx_);
  133. ctxSet_.store(true, std::memory_order_release);
  134. }
  135. }
  136. }
  137. void AsyncIO::submit(Op* op) {
  138. CHECK_EQ(op->state(), Op::State::INITIALIZED);
  139. initializeContext(); // on demand
  140. // We can increment past capacity, but we'll clean up after ourselves.
  141. auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
  142. if (p >= capacity_) {
  143. decrementPending();
  144. throw std::range_error("AsyncIO: too many pending requests");
  145. }
  146. iocb* cb = &op->iocb_;
  147. cb->data = nullptr; // unused
  148. if (pollFd_ != -1) {
  149. io_set_eventfd(cb, pollFd_);
  150. }
  151. int rc = io_submit(ctx_, 1, &cb);
  152. if (rc < 0) {
  153. decrementPending();
  154. throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
  155. }
  156. submitted_++;
  157. DCHECK_EQ(rc, 1);
  158. op->start();
  159. }
  160. Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
  161. CHECK(ctx_);
  162. CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
  163. auto p = pending_.load(std::memory_order_acquire);
  164. CHECK_LE(minRequests, p);
  165. return doWait(WaitType::COMPLETE, minRequests, p, completed_);
  166. }
  167. Range<AsyncIO::Op**> AsyncIO::cancel() {
  168. CHECK(ctx_);
  169. auto p = pending_.load(std::memory_order_acquire);
  170. return doWait(WaitType::CANCEL, p, p, canceled_);
  171. }
  172. Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
  173. CHECK(ctx_);
  174. CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
  175. uint64_t numEvents;
  176. // This sets the eventFd counter to 0, see
  177. // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
  178. ssize_t rc;
  179. do {
  180. rc = ::read(pollFd_, &numEvents, 8);
  181. } while (rc == -1 && errno == EINTR);
  182. if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
  183. return Range<Op**>(); // nothing completed
  184. }
  185. checkUnixError(rc, "AsyncIO: read from event fd failed");
  186. DCHECK_EQ(rc, 8);
  187. DCHECK_GT(numEvents, 0);
  188. DCHECK_LE(numEvents, pending_);
  189. // Don't reap more than numEvents, as we've just reset the counter to 0.
  190. return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
  191. }
  192. Range<AsyncIO::Op**> AsyncIO::doWait(
  193. WaitType type,
  194. size_t minRequests,
  195. size_t maxRequests,
  196. std::vector<Op*>& result) {
  197. io_event events[maxRequests];
  198. // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
  199. // WaitType::CANCEL we have to wait for IO completion.
  200. size_t count = 0;
  201. do {
  202. int ret;
  203. do {
  204. // GOTCHA: io_getevents() may returns less than min_nr results if
  205. // interrupted after some events have been read (if before, -EINTR
  206. // is returned).
  207. ret = io_getevents(
  208. ctx_,
  209. minRequests - count,
  210. maxRequests - count,
  211. events + count,
  212. /* timeout */ nullptr); // wait forever
  213. } while (ret == -EINTR);
  214. // Check as may not be able to recover without leaking events.
  215. CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
  216. << errnoStr(-ret);
  217. count += ret;
  218. } while (count < minRequests);
  219. DCHECK_LE(count, maxRequests);
  220. result.clear();
  221. for (size_t i = 0; i < count; ++i) {
  222. DCHECK(events[i].obj);
  223. Op* op = boost::intrusive::get_parent_from_member(
  224. events[i].obj, &AsyncIOOp::iocb_);
  225. decrementPending();
  226. switch (type) {
  227. case WaitType::COMPLETE:
  228. op->complete(events[i].res);
  229. break;
  230. case WaitType::CANCEL:
  231. op->cancel();
  232. break;
  233. }
  234. result.push_back(op);
  235. }
  236. return range(result);
  237. }
  238. AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
  239. AsyncIOQueue::~AsyncIOQueue() {
  240. CHECK_EQ(asyncIO_->pending(), 0);
  241. }
  242. void AsyncIOQueue::submit(AsyncIOOp* op) {
  243. submit([op]() { return op; });
  244. }
  245. void AsyncIOQueue::submit(OpFactory op) {
  246. queue_.push_back(op);
  247. maybeDequeue();
  248. }
  249. void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) {
  250. maybeDequeue();
  251. }
  252. void AsyncIOQueue::maybeDequeue() {
  253. while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
  254. auto& opFactory = queue_.front();
  255. auto op = opFactory();
  256. queue_.pop_front();
  257. // Interpose our completion callback
  258. auto& nextCb = op->notificationCallback();
  259. op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
  260. this->onCompleted(op2);
  261. if (nextCb) {
  262. nextCb(op2);
  263. }
  264. });
  265. asyncIO_->submit(op);
  266. }
  267. }
  268. // debugging helpers:
  269. namespace {
  270. #define X(c) \
  271. case c: \
  272. return #c
  273. const char* asyncIoOpStateToString(AsyncIOOp::State state) {
  274. switch (state) {
  275. X(AsyncIOOp::State::UNINITIALIZED);
  276. X(AsyncIOOp::State::INITIALIZED);
  277. X(AsyncIOOp::State::PENDING);
  278. X(AsyncIOOp::State::COMPLETED);
  279. X(AsyncIOOp::State::CANCELED);
  280. }
  281. return "<INVALID AsyncIOOp::State>";
  282. }
  283. const char* iocbCmdToString(short int cmd_short) {
  284. io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
  285. switch (cmd) {
  286. X(IO_CMD_PREAD);
  287. X(IO_CMD_PWRITE);
  288. X(IO_CMD_FSYNC);
  289. X(IO_CMD_FDSYNC);
  290. X(IO_CMD_POLL);
  291. X(IO_CMD_NOOP);
  292. X(IO_CMD_PREADV);
  293. X(IO_CMD_PWRITEV);
  294. };
  295. return "<INVALID io_iocb_cmd>";
  296. }
  297. #undef X
  298. std::string fd2name(int fd) {
  299. std::string path = folly::to<std::string>("/proc/self/fd/", fd);
  300. char link[PATH_MAX];
  301. const ssize_t length =
  302. std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
  303. return path.assign(link, length);
  304. }
  305. std::ostream& operator<<(std::ostream& os, const iocb& cb) {
  306. os << folly::format(
  307. "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
  308. cb.data,
  309. cb.key,
  310. iocbCmdToString(cb.aio_lio_opcode),
  311. cb.aio_reqprio,
  312. cb.aio_fildes,
  313. fd2name(cb.aio_fildes));
  314. switch (cb.aio_lio_opcode) {
  315. case IO_CMD_PREAD:
  316. case IO_CMD_PWRITE:
  317. os << folly::format(
  318. "buf={}, offset={}, nbytes={}, ",
  319. cb.u.c.buf,
  320. cb.u.c.offset,
  321. cb.u.c.nbytes);
  322. break;
  323. default:
  324. os << "[TODO: write debug string for "
  325. << iocbCmdToString(cb.aio_lio_opcode) << "] ";
  326. break;
  327. }
  328. return os;
  329. }
  330. } // namespace
  331. std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
  332. os << "{" << op.state_ << ", ";
  333. if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
  334. os << op.iocb_;
  335. }
  336. if (op.state_ == AsyncIOOp::State::COMPLETED) {
  337. os << "result=" << op.result_;
  338. if (op.result_ < 0) {
  339. os << " (" << errnoStr(-op.result_) << ')';
  340. }
  341. os << ", ";
  342. }
  343. return os << "}";
  344. }
  345. std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
  346. return os << asyncIoOpStateToString(state);
  347. }
  348. } // namespace folly