BlockingSocket.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. /*
  2. * Copyright 2015-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <folly/Optional.h>
  18. #include <folly/io/async/AsyncSSLSocket.h>
  19. #include <folly/io/async/AsyncSocket.h>
  20. #include <folly/io/async/SSLContext.h>
  21. class BlockingSocket : public folly::AsyncSocket::ConnectCallback,
  22. public folly::AsyncTransportWrapper::ReadCallback,
  23. public folly::AsyncTransportWrapper::WriteCallback {
  24. public:
  25. explicit BlockingSocket(int fd)
  26. : sock_(new folly::AsyncSocket(&eventBase_, fd)) {}
  27. BlockingSocket(
  28. folly::SocketAddress address,
  29. std::shared_ptr<folly::SSLContext> sslContext)
  30. : sock_(
  31. sslContext ? new folly::AsyncSSLSocket(sslContext, &eventBase_)
  32. : new folly::AsyncSocket(&eventBase_)),
  33. address_(address) {}
  34. explicit BlockingSocket(folly::AsyncSocket::UniquePtr socket)
  35. : sock_(std::move(socket)) {
  36. sock_->attachEventBase(&eventBase_);
  37. }
  38. void enableTFO() {
  39. sock_->enableTFO();
  40. }
  41. void setAddress(folly::SocketAddress address) {
  42. address_ = address;
  43. }
  44. void open(
  45. std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) {
  46. sock_->connect(this, address_, timeout.count());
  47. eventBase_.loop();
  48. if (err_.hasValue()) {
  49. throw err_.value();
  50. }
  51. }
  52. void close() {
  53. sock_->close();
  54. }
  55. void closeWithReset() {
  56. sock_->closeWithReset();
  57. }
  58. int32_t write(uint8_t const* buf, size_t len) {
  59. sock_->write(this, buf, len);
  60. eventBase_.loop();
  61. if (err_.hasValue()) {
  62. throw err_.value();
  63. }
  64. return len;
  65. }
  66. void flush() {}
  67. int32_t readAll(uint8_t* buf, size_t len) {
  68. return readHelper(buf, len, true);
  69. }
  70. int32_t read(uint8_t* buf, size_t len) {
  71. return readHelper(buf, len, false);
  72. }
  73. int getSocketFD() const {
  74. return sock_->getFd();
  75. }
  76. folly::AsyncSocket* getSocket() {
  77. return sock_.get();
  78. }
  79. folly::AsyncSSLSocket* getSSLSocket() {
  80. return dynamic_cast<folly::AsyncSSLSocket*>(sock_.get());
  81. }
  82. private:
  83. folly::EventBase eventBase_;
  84. folly::AsyncSocket::UniquePtr sock_;
  85. folly::Optional<folly::AsyncSocketException> err_;
  86. uint8_t* readBuf_{nullptr};
  87. size_t readLen_{0};
  88. folly::SocketAddress address_;
  89. void connectSuccess() noexcept override {}
  90. void connectErr(const folly::AsyncSocketException& ex) noexcept override {
  91. err_ = ex;
  92. }
  93. void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
  94. *bufReturn = readBuf_;
  95. *lenReturn = readLen_;
  96. }
  97. void readDataAvailable(size_t len) noexcept override {
  98. readBuf_ += len;
  99. readLen_ -= len;
  100. if (readLen_ == 0) {
  101. sock_->setReadCB(nullptr);
  102. }
  103. }
  104. void readEOF() noexcept override {}
  105. void readErr(const folly::AsyncSocketException& ex) noexcept override {
  106. err_ = ex;
  107. }
  108. void writeSuccess() noexcept override {}
  109. void writeErr(
  110. size_t /* bytesWritten */,
  111. const folly::AsyncSocketException& ex) noexcept override {
  112. err_ = ex;
  113. }
  114. int32_t readHelper(uint8_t* buf, size_t len, bool all) {
  115. if (!sock_->good()) {
  116. return 0;
  117. }
  118. readBuf_ = buf;
  119. readLen_ = len;
  120. sock_->setReadCB(this);
  121. while (!err_ && sock_->good() && readLen_ > 0) {
  122. eventBase_.loopOnce();
  123. if (!all) {
  124. break;
  125. }
  126. }
  127. sock_->setReadCB(nullptr);
  128. if (err_.hasValue()) {
  129. throw err_.value();
  130. }
  131. if (all && readLen_ > 0) {
  132. throw folly::AsyncSocketException(
  133. folly::AsyncSocketException::UNKNOWN, "eof");
  134. }
  135. return len - readLen_;
  136. }
  137. };