AsyncPipeTest.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/async/AsyncPipe.h>
  17. #include <folly/Memory.h>
  18. #include <folly/io/async/EventBase.h>
  19. #include <folly/portability/GTest.h>
  20. #include <fcntl.h>
  21. using namespace testing;
  22. namespace {
  23. class TestReadCallback : public folly::AsyncReader::ReadCallback {
  24. public:
  25. bool isBufferMovable() noexcept override {
  26. return movable_;
  27. }
  28. void setMovable(bool movable) {
  29. movable_ = movable;
  30. }
  31. void readBufferAvailable(
  32. std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
  33. readBuffer_.append(std::move(readBuf));
  34. }
  35. void readDataAvailable(size_t len) noexcept override {
  36. readBuffer_.postallocate(len);
  37. }
  38. void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
  39. auto res = readBuffer_.preallocate(4000, 65000);
  40. *bufReturn = res.first;
  41. *lenReturn = res.second;
  42. }
  43. void readEOF() noexcept override {}
  44. void readErr(const folly::AsyncSocketException&) noexcept override {
  45. error_ = true;
  46. }
  47. std::string getData() {
  48. auto buf = readBuffer_.move();
  49. buf->coalesce();
  50. return std::string((char*)buf->data(), buf->length());
  51. }
  52. void reset() {
  53. movable_ = false;
  54. error_ = false;
  55. readBuffer_.clear();
  56. }
  57. folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
  58. bool error_{false};
  59. bool movable_{false};
  60. };
  61. class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
  62. public:
  63. void writeSuccess() noexcept override {
  64. writes_++;
  65. }
  66. void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
  67. error_ = true;
  68. }
  69. void reset() {
  70. writes_ = 0;
  71. error_ = false;
  72. }
  73. uint32_t writes_{0};
  74. bool error_{false};
  75. };
  76. class AsyncPipeTest : public Test {
  77. public:
  78. void reset(bool movable) {
  79. reader_.reset();
  80. readCallback_.reset();
  81. writer_.reset();
  82. writeCallback_.reset();
  83. int rc = pipe(pipeFds_);
  84. EXPECT_EQ(rc, 0);
  85. EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
  86. EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
  87. reader_ = folly::AsyncPipeReader::newReader(&eventBase_, pipeFds_[0]);
  88. writer_ = folly::AsyncPipeWriter::newWriter(&eventBase_, pipeFds_[1]);
  89. readCallback_.setMovable(movable);
  90. }
  91. protected:
  92. folly::EventBase eventBase_;
  93. int pipeFds_[2];
  94. folly::AsyncPipeReader::UniquePtr reader_;
  95. folly::AsyncPipeWriter::UniquePtr writer_;
  96. TestReadCallback readCallback_;
  97. TestWriteCallback writeCallback_;
  98. };
  99. std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
  100. auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
  101. return buf;
  102. }
  103. } // namespace
  104. TEST_F(AsyncPipeTest, simple) {
  105. for (int pass = 0; pass < 2; ++pass) {
  106. reset(pass % 2 != 0);
  107. reader_->setReadCB(&readCallback_);
  108. writer_->write(getBuf("hello"), &writeCallback_);
  109. writer_->closeOnEmpty();
  110. eventBase_.loop();
  111. EXPECT_EQ(readCallback_.getData(), "hello");
  112. EXPECT_FALSE(readCallback_.error_);
  113. EXPECT_EQ(writeCallback_.writes_, 1);
  114. EXPECT_FALSE(writeCallback_.error_);
  115. }
  116. }
  117. TEST_F(AsyncPipeTest, blocked_writes) {
  118. for (int pass = 0; pass < 2; ++pass) {
  119. reset(pass % 2 != 0);
  120. uint32_t writeAttempts = 0;
  121. do {
  122. ++writeAttempts;
  123. writer_->write(getBuf("hello"), &writeCallback_);
  124. } while (writeCallback_.writes_ == writeAttempts);
  125. // there is one blocked write
  126. writer_->closeOnEmpty();
  127. reader_->setReadCB(&readCallback_);
  128. eventBase_.loop();
  129. std::string expected;
  130. for (uint32_t i = 0; i < writeAttempts; i++) {
  131. expected += "hello";
  132. }
  133. EXPECT_EQ(readCallback_.getData(), expected);
  134. EXPECT_FALSE(readCallback_.error_);
  135. EXPECT_EQ(writeCallback_.writes_, writeAttempts);
  136. EXPECT_FALSE(writeCallback_.error_);
  137. }
  138. }
  139. TEST_F(AsyncPipeTest, writeOnClose) {
  140. for (int pass = 0; pass < 2; ++pass) {
  141. reset(pass % 2 != 0);
  142. reader_->setReadCB(&readCallback_);
  143. writer_->write(getBuf("hello"), &writeCallback_);
  144. writer_->closeOnEmpty();
  145. writer_->write(getBuf("hello"), &writeCallback_);
  146. eventBase_.loop();
  147. EXPECT_EQ(readCallback_.getData(), "hello");
  148. EXPECT_FALSE(readCallback_.error_);
  149. EXPECT_EQ(writeCallback_.writes_, 1);
  150. EXPECT_TRUE(writeCallback_.error_);
  151. }
  152. }