AsyncPipe.cpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/async/AsyncPipe.h>
  17. #include <folly/FileUtil.h>
  18. #include <folly/io/async/AsyncSocketException.h>
  19. using folly::IOBuf;
  20. using folly::IOBufQueue;
  21. using std::string;
  22. using std::unique_ptr;
  23. namespace folly {
  24. AsyncPipeReader::~AsyncPipeReader() {
  25. close();
  26. }
  27. void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
  28. VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_
  29. << "): failed while reading: " << ex.what();
  30. DCHECK(readCallback_ != nullptr);
  31. AsyncReader::ReadCallback* callback = readCallback_;
  32. readCallback_ = nullptr;
  33. callback->readErr(ex);
  34. close();
  35. }
  36. void AsyncPipeReader::close() {
  37. unregisterHandler();
  38. if (fd_ >= 0) {
  39. changeHandlerFD(-1);
  40. if (closeCb_) {
  41. closeCb_(fd_);
  42. } else {
  43. ::close(fd_);
  44. }
  45. fd_ = -1;
  46. }
  47. }
  48. void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
  49. DestructorGuard dg(this);
  50. CHECK(events & EventHandler::READ);
  51. VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
  52. assert(readCallback_ != nullptr);
  53. while (readCallback_) {
  54. // - What API does callback support?
  55. const auto movable = readCallback_->isBufferMovable(); // noexcept
  56. // Get the buffer to read into.
  57. void* buf = nullptr;
  58. size_t buflen = 0;
  59. std::unique_ptr<IOBuf> ioBuf;
  60. if (movable) {
  61. ioBuf = IOBuf::create(readCallback_->maxBufferSize());
  62. buf = ioBuf->writableBuffer();
  63. buflen = ioBuf->capacity();
  64. } else {
  65. try {
  66. readCallback_->getReadBuffer(&buf, &buflen);
  67. } catch (const std::exception& ex) {
  68. AsyncSocketException aex(
  69. AsyncSocketException::BAD_ARGS,
  70. string("ReadCallback::getReadBuffer() "
  71. "threw exception: ") +
  72. ex.what());
  73. failRead(aex);
  74. return;
  75. } catch (...) {
  76. AsyncSocketException aex(
  77. AsyncSocketException::BAD_ARGS,
  78. string("ReadCallback::getReadBuffer() "
  79. "threw non-exception type"));
  80. failRead(aex);
  81. return;
  82. }
  83. if (buf == nullptr || buflen == 0) {
  84. AsyncSocketException aex(
  85. AsyncSocketException::INVALID_STATE,
  86. string("ReadCallback::getReadBuffer() "
  87. "returned empty buffer"));
  88. failRead(aex);
  89. return;
  90. }
  91. }
  92. // Perform the read
  93. ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
  94. if (bytesRead > 0) {
  95. if (movable) {
  96. ioBuf->append(std::size_t(bytesRead));
  97. readCallback_->readBufferAvailable(std::move(ioBuf));
  98. } else {
  99. readCallback_->readDataAvailable(size_t(bytesRead));
  100. }
  101. // Fall through and continue around the loop if the read
  102. // completely filled the available buffer.
  103. // Note that readCallback_ may have been uninstalled or changed inside
  104. // readDataAvailable().
  105. if (static_cast<size_t>(bytesRead) < buflen) {
  106. return;
  107. }
  108. } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  109. // No more data to read right now.
  110. return;
  111. } else if (bytesRead < 0) {
  112. AsyncSocketException ex(
  113. AsyncSocketException::INVALID_STATE, "read failed", errno);
  114. failRead(ex);
  115. return;
  116. } else {
  117. assert(bytesRead == 0);
  118. // EOF
  119. unregisterHandler();
  120. AsyncReader::ReadCallback* callback = readCallback_;
  121. readCallback_ = nullptr;
  122. callback->readEOF();
  123. return;
  124. }
  125. // Max reads per loop?
  126. }
  127. }
  128. void AsyncPipeWriter::write(
  129. unique_ptr<folly::IOBuf> buf,
  130. AsyncWriter::WriteCallback* callback) {
  131. if (closed()) {
  132. if (callback) {
  133. AsyncSocketException ex(
  134. AsyncSocketException::NOT_OPEN, "attempt to write to closed pipe");
  135. callback->writeErr(0, ex);
  136. }
  137. return;
  138. }
  139. bool wasEmpty = (queue_.empty());
  140. folly::IOBufQueue iobq;
  141. iobq.append(std::move(buf));
  142. std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
  143. std::move(iobq), callback);
  144. queue_.emplace_back(std::move(p));
  145. if (wasEmpty) {
  146. handleWrite();
  147. } else {
  148. CHECK(!queue_.empty());
  149. CHECK(isHandlerRegistered());
  150. }
  151. }
  152. void AsyncPipeWriter::writeChain(
  153. folly::AsyncWriter::WriteCallback* callback,
  154. std::unique_ptr<folly::IOBuf>&& buf,
  155. WriteFlags) {
  156. write(std::move(buf), callback);
  157. }
  158. void AsyncPipeWriter::closeOnEmpty() {
  159. VLOG(5) << "close on empty";
  160. if (queue_.empty()) {
  161. closeNow();
  162. } else {
  163. closeOnEmpty_ = true;
  164. CHECK(isHandlerRegistered());
  165. }
  166. }
  167. void AsyncPipeWriter::closeNow() {
  168. VLOG(5) << "close now";
  169. if (!queue_.empty()) {
  170. failAllWrites(AsyncSocketException(
  171. AsyncSocketException::NOT_OPEN, "closed with pending writes"));
  172. }
  173. if (fd_ >= 0) {
  174. unregisterHandler();
  175. changeHandlerFD(-1);
  176. if (closeCb_) {
  177. closeCb_(fd_);
  178. } else {
  179. close(fd_);
  180. }
  181. fd_ = -1;
  182. }
  183. }
  184. void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
  185. DestructorGuard dg(this);
  186. while (!queue_.empty()) {
  187. // the first entry of the queue could have had a partial write, but needs to
  188. // be tracked.
  189. if (queue_.front().second) {
  190. queue_.front().second->writeErr(0, ex);
  191. }
  192. queue_.pop_front();
  193. }
  194. }
  195. void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
  196. CHECK(events & EventHandler::WRITE);
  197. handleWrite();
  198. }
  199. void AsyncPipeWriter::handleWrite() {
  200. DestructorGuard dg(this);
  201. assert(!queue_.empty());
  202. do {
  203. auto& front = queue_.front();
  204. folly::IOBufQueue& curQueue = front.first;
  205. DCHECK(!curQueue.empty());
  206. // someday, support writev. The logic for partial writes is a bit complex
  207. const IOBuf* head = curQueue.front();
  208. CHECK(head->length());
  209. ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
  210. if (rc < 0) {
  211. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  212. // pipe is full
  213. VLOG(5) << "write blocked";
  214. registerHandler(EventHandler::WRITE);
  215. return;
  216. } else {
  217. failAllWrites(AsyncSocketException(
  218. AsyncSocketException::INTERNAL_ERROR, "write failed", errno));
  219. closeNow();
  220. return;
  221. }
  222. } else if (rc == 0) {
  223. registerHandler(EventHandler::WRITE);
  224. return;
  225. }
  226. curQueue.trimStart(size_t(rc));
  227. if (curQueue.empty()) {
  228. auto cb = front.second;
  229. queue_.pop_front();
  230. if (cb) {
  231. cb->writeSuccess();
  232. }
  233. } else {
  234. VLOG(5) << "partial write blocked";
  235. }
  236. } while (!queue_.empty());
  237. if (closeOnEmpty_) {
  238. closeNow();
  239. } else {
  240. unregisterHandler();
  241. }
  242. }
  243. } // namespace folly