AsyncTransport.h 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <memory>
  18. #include <folly/io/IOBuf.h>
  19. #include <folly/io/async/AsyncSocketBase.h>
  20. #include <folly/io/async/AsyncTransportCertificate.h>
  21. #include <folly/io/async/DelayedDestruction.h>
  22. #include <folly/io/async/EventBase.h>
  23. #include <folly/portability/OpenSSL.h>
  24. #include <folly/portability/SysUio.h>
  25. #include <folly/ssl/OpenSSLPtrTypes.h>
  26. constexpr bool kOpenSslModeMoveBufferOwnership =
  27. #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
  28. true
  29. #else
  30. false
  31. #endif
  32. ;
  33. namespace folly {
  34. class AsyncSocketException;
  35. class EventBase;
  36. class SocketAddress;
  37. /*
  38. * flags given by the application for write* calls
  39. */
  40. enum class WriteFlags : uint32_t {
  41. NONE = 0x00,
  42. /*
  43. * Whether to delay the output until a subsequent non-corked write.
  44. * (Note: may not be supported in all subclasses or on all platforms.)
  45. */
  46. CORK = 0x01,
  47. /*
  48. * for a socket that has ACK latency enabled, it will cause the kernel
  49. * to fire a TCP ESTATS event when the last byte of the given write call
  50. * will be acknowledged.
  51. */
  52. EOR = 0x02,
  53. /*
  54. * this indicates that only the write side of socket should be shutdown
  55. */
  56. WRITE_SHUTDOWN = 0x04,
  57. /*
  58. * use msg zerocopy if allowed
  59. */
  60. WRITE_MSG_ZEROCOPY = 0x08,
  61. };
  62. /*
  63. * union operator
  64. */
  65. inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
  66. return static_cast<WriteFlags>(
  67. static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
  68. }
  69. /*
  70. * compound assignment union operator
  71. */
  72. inline WriteFlags& operator|=(WriteFlags& a, WriteFlags b) {
  73. a = a | b;
  74. return a;
  75. }
  76. /*
  77. * intersection operator
  78. */
  79. inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
  80. return static_cast<WriteFlags>(
  81. static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
  82. }
  83. /*
  84. * compound assignment intersection operator
  85. */
  86. inline WriteFlags& operator&=(WriteFlags& a, WriteFlags b) {
  87. a = a & b;
  88. return a;
  89. }
  90. /*
  91. * exclusion parameter
  92. */
  93. inline WriteFlags operator~(WriteFlags a) {
  94. return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
  95. }
  96. /*
  97. * unset operator
  98. */
  99. inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
  100. return a & ~b;
  101. }
  102. /*
  103. * inclusion operator
  104. */
  105. inline bool isSet(WriteFlags a, WriteFlags b) {
  106. return (a & b) == b;
  107. }
  108. /**
  109. * AsyncTransport defines an asynchronous API for streaming I/O.
  110. *
  111. * This class provides an API to for asynchronously waiting for data
  112. * on a streaming transport, and for asynchronously sending data.
  113. *
  114. * The APIs for reading and writing are intentionally asymmetric. Waiting for
  115. * data to read is a persistent API: a callback is installed, and is notified
  116. * whenever new data is available. It continues to be notified of new events
  117. * until it is uninstalled.
  118. *
  119. * AsyncTransport does not provide read timeout functionality, because it
  120. * typically cannot determine when the timeout should be active. Generally, a
  121. * timeout should only be enabled when processing is blocked waiting on data
  122. * from the remote endpoint. For server-side applications, the timeout should
  123. * not be active if the server is currently processing one or more outstanding
  124. * requests on this transport. For client-side applications, the timeout
  125. * should not be active if there are no requests pending on the transport.
  126. * Additionally, if a client has multiple pending requests, it will ususally
  127. * want a separate timeout for each request, rather than a single read timeout.
  128. *
  129. * The write API is fairly intuitive: a user can request to send a block of
  130. * data, and a callback will be informed once the entire block has been
  131. * transferred to the kernel, or on error. AsyncTransport does provide a send
  132. * timeout, since most callers want to give up if the remote end stops
  133. * responding and no further progress can be made sending the data.
  134. */
  135. class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
  136. public:
  137. typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
  138. /**
  139. * Close the transport.
  140. *
  141. * This gracefully closes the transport, waiting for all pending write
  142. * requests to complete before actually closing the underlying transport.
  143. *
  144. * If a read callback is set, readEOF() will be called immediately. If there
  145. * are outstanding write requests, the close will be delayed until all
  146. * remaining writes have completed. No new writes may be started after
  147. * close() has been called.
  148. */
  149. virtual void close() = 0;
  150. /**
  151. * Close the transport immediately.
  152. *
  153. * This closes the transport immediately, dropping any outstanding data
  154. * waiting to be written.
  155. *
  156. * If a read callback is set, readEOF() will be called immediately.
  157. * If there are outstanding write requests, these requests will be aborted
  158. * and writeError() will be invoked immediately on all outstanding write
  159. * callbacks.
  160. */
  161. virtual void closeNow() = 0;
  162. /**
  163. * Reset the transport immediately.
  164. *
  165. * This closes the transport immediately, sending a reset to the remote peer
  166. * if possible to indicate abnormal shutdown.
  167. *
  168. * Note that not all subclasses implement this reset functionality: some
  169. * subclasses may treat reset() the same as closeNow(). Subclasses that use
  170. * TCP transports should terminate the connection with a TCP reset.
  171. */
  172. virtual void closeWithReset() {
  173. closeNow();
  174. }
  175. /**
  176. * Perform a half-shutdown of the write side of the transport.
  177. *
  178. * The caller should not make any more calls to write() or writev() after
  179. * shutdownWrite() is called. Any future write attempts will fail
  180. * immediately.
  181. *
  182. * Not all transport types support half-shutdown. If the underlying
  183. * transport does not support half-shutdown, it will fully shutdown both the
  184. * read and write sides of the transport. (Fully shutting down the socket is
  185. * better than doing nothing at all, since the caller may rely on the
  186. * shutdownWrite() call to notify the other end of the connection that no
  187. * more data can be read.)
  188. *
  189. * If there is pending data still waiting to be written on the transport,
  190. * the actual shutdown will be delayed until the pending data has been
  191. * written.
  192. *
  193. * Note: There is no corresponding shutdownRead() equivalent. Simply
  194. * uninstall the read callback if you wish to stop reading. (On TCP sockets
  195. * at least, shutting down the read side of the socket is a no-op anyway.)
  196. */
  197. virtual void shutdownWrite() = 0;
  198. /**
  199. * Perform a half-shutdown of the write side of the transport.
  200. *
  201. * shutdownWriteNow() is identical to shutdownWrite(), except that it
  202. * immediately performs the shutdown, rather than waiting for pending writes
  203. * to complete. Any pending write requests will be immediately failed when
  204. * shutdownWriteNow() is called.
  205. */
  206. virtual void shutdownWriteNow() = 0;
  207. /**
  208. * Determine if transport is open and ready to read or write.
  209. *
  210. * Note that this function returns false on EOF; you must also call error()
  211. * to distinguish between an EOF and an error.
  212. *
  213. * @return true iff the transport is open and ready, false otherwise.
  214. */
  215. virtual bool good() const = 0;
  216. /**
  217. * Determine if the transport is readable or not.
  218. *
  219. * @return true iff the transport is readable, false otherwise.
  220. */
  221. virtual bool readable() const = 0;
  222. /**
  223. * Determine if the transport is writable or not.
  224. *
  225. * @return true iff the transport is writable, false otherwise.
  226. */
  227. virtual bool writable() const {
  228. // By default return good() - leave it to implementers to override.
  229. return good();
  230. }
  231. /**
  232. * Determine if the there is pending data on the transport.
  233. *
  234. * @return true iff the if the there is pending data, false otherwise.
  235. */
  236. virtual bool isPending() const {
  237. return readable();
  238. }
  239. /**
  240. * Determine if transport is connected to the endpoint
  241. *
  242. * @return false iff the transport is connected, otherwise true
  243. */
  244. virtual bool connecting() const = 0;
  245. /**
  246. * Determine if an error has occurred with this transport.
  247. *
  248. * @return true iff an error has occurred (not EOF).
  249. */
  250. virtual bool error() const = 0;
  251. /**
  252. * Attach the transport to a EventBase.
  253. *
  254. * This may only be called if the transport is not currently attached to a
  255. * EventBase (by an earlier call to detachEventBase()).
  256. *
  257. * This method must be invoked in the EventBase's thread.
  258. */
  259. virtual void attachEventBase(EventBase* eventBase) = 0;
  260. /**
  261. * Detach the transport from its EventBase.
  262. *
  263. * This may only be called when the transport is idle and has no reads or
  264. * writes pending. Once detached, the transport may not be used again until
  265. * it is re-attached to a EventBase by calling attachEventBase().
  266. *
  267. * This method must be called from the current EventBase's thread.
  268. */
  269. virtual void detachEventBase() = 0;
  270. /**
  271. * Determine if the transport can be detached.
  272. *
  273. * This method must be called from the current EventBase's thread.
  274. */
  275. virtual bool isDetachable() const = 0;
  276. /**
  277. * Set the send timeout.
  278. *
  279. * If write requests do not make any progress for more than the specified
  280. * number of milliseconds, fail all pending writes and close the transport.
  281. *
  282. * If write requests are currently pending when setSendTimeout() is called,
  283. * the timeout interval is immediately restarted using the new value.
  284. *
  285. * @param milliseconds The timeout duration, in milliseconds. If 0, no
  286. * timeout will be used.
  287. */
  288. virtual void setSendTimeout(uint32_t milliseconds) = 0;
  289. /**
  290. * Get the send timeout.
  291. *
  292. * @return Returns the current send timeout, in milliseconds. A return value
  293. * of 0 indicates that no timeout is set.
  294. */
  295. virtual uint32_t getSendTimeout() const = 0;
  296. /**
  297. * Get the address of the local endpoint of this transport.
  298. *
  299. * This function may throw AsyncSocketException on error.
  300. *
  301. * @param address The local address will be stored in the specified
  302. * SocketAddress.
  303. */
  304. virtual void getLocalAddress(SocketAddress* address) const = 0;
  305. /**
  306. * Get the address of the remote endpoint to which this transport is
  307. * connected.
  308. *
  309. * This function may throw AsyncSocketException on error.
  310. *
  311. * @return Return the local address
  312. */
  313. SocketAddress getLocalAddress() const {
  314. SocketAddress addr;
  315. getLocalAddress(&addr);
  316. return addr;
  317. }
  318. void getAddress(SocketAddress* address) const override {
  319. getLocalAddress(address);
  320. }
  321. /**
  322. * Get the address of the remote endpoint to which this transport is
  323. * connected.
  324. *
  325. * This function may throw AsyncSocketException on error.
  326. *
  327. * @param address The remote endpoint's address will be stored in the
  328. * specified SocketAddress.
  329. */
  330. virtual void getPeerAddress(SocketAddress* address) const = 0;
  331. /**
  332. * Get the address of the remote endpoint to which this transport is
  333. * connected.
  334. *
  335. * This function may throw AsyncSocketException on error.
  336. *
  337. * @return Return the remote endpoint's address
  338. */
  339. SocketAddress getPeerAddress() const {
  340. SocketAddress addr;
  341. getPeerAddress(&addr);
  342. return addr;
  343. }
  344. /**
  345. * Get the certificate used to authenticate the peer.
  346. */
  347. virtual ssl::X509UniquePtr getPeerCert() const {
  348. return nullptr;
  349. }
  350. /**
  351. * The local certificate used for this connection. May be null
  352. */
  353. virtual const X509* getSelfCert() const {
  354. return nullptr;
  355. }
  356. /**
  357. * Get the peer certificate information if any
  358. */
  359. virtual const AsyncTransportCertificate* getPeerCertificate() const {
  360. return nullptr;
  361. }
  362. /**
  363. * Get the certificate information of this transport, if any
  364. */
  365. virtual const AsyncTransportCertificate* getSelfCertificate() const {
  366. return nullptr;
  367. }
  368. /**
  369. * Return the application protocol being used by the underlying transport
  370. * protocol. This is useful for transports which are used to tunnel other
  371. * protocols.
  372. */
  373. virtual std::string getApplicationProtocol() const noexcept {
  374. return "";
  375. }
  376. /**
  377. * Returns the name of the security protocol being used.
  378. */
  379. virtual std::string getSecurityProtocol() const {
  380. return "";
  381. }
  382. /**
  383. * @return True iff end of record tracking is enabled
  384. */
  385. virtual bool isEorTrackingEnabled() const = 0;
  386. virtual void setEorTracking(bool track) = 0;
  387. virtual size_t getAppBytesWritten() const = 0;
  388. virtual size_t getRawBytesWritten() const = 0;
  389. virtual size_t getAppBytesReceived() const = 0;
  390. virtual size_t getRawBytesReceived() const = 0;
  391. class BufferCallback {
  392. public:
  393. virtual ~BufferCallback() {}
  394. virtual void onEgressBuffered() = 0;
  395. virtual void onEgressBufferCleared() = 0;
  396. };
  397. /**
  398. * Callback class to signal when a transport that did not have replay
  399. * protection gains replay protection. This is needed for 0-RTT security
  400. * protocols.
  401. */
  402. class ReplaySafetyCallback {
  403. public:
  404. virtual ~ReplaySafetyCallback() = default;
  405. /**
  406. * Called when the transport becomes replay safe.
  407. */
  408. virtual void onReplaySafe() = 0;
  409. };
  410. /**
  411. * False if the transport does not have replay protection, but will in the
  412. * future.
  413. */
  414. virtual bool isReplaySafe() const {
  415. return true;
  416. }
  417. /**
  418. * Set the ReplaySafeCallback on this transport.
  419. *
  420. * This should only be called if isReplaySafe() returns false.
  421. */
  422. virtual void setReplaySafetyCallback(ReplaySafetyCallback* callback) {
  423. if (callback) {
  424. CHECK(false) << "setReplaySafetyCallback() not supported";
  425. }
  426. }
  427. protected:
  428. ~AsyncTransport() override = default;
  429. };
  430. class AsyncReader {
  431. public:
  432. class ReadCallback {
  433. public:
  434. virtual ~ReadCallback() = default;
  435. /**
  436. * When data becomes available, getReadBuffer() will be invoked to get the
  437. * buffer into which data should be read.
  438. *
  439. * This method allows the ReadCallback to delay buffer allocation until
  440. * data becomes available. This allows applications to manage large
  441. * numbers of idle connections, without having to maintain a separate read
  442. * buffer for each idle connection.
  443. *
  444. * It is possible that in some cases, getReadBuffer() may be called
  445. * multiple times before readDataAvailable() is invoked. In this case, the
  446. * data will be written to the buffer returned from the most recent call to
  447. * readDataAvailable(). If the previous calls to readDataAvailable()
  448. * returned different buffers, the ReadCallback is responsible for ensuring
  449. * that they are not leaked.
  450. *
  451. * If getReadBuffer() throws an exception, returns a nullptr buffer, or
  452. * returns a 0 length, the ReadCallback will be uninstalled and its
  453. * readError() method will be invoked.
  454. *
  455. * getReadBuffer() is not allowed to change the transport state before it
  456. * returns. (For example, it should never uninstall the read callback, or
  457. * set a different read callback.)
  458. *
  459. * @param bufReturn getReadBuffer() should update *bufReturn to contain the
  460. * address of the read buffer. This parameter will never
  461. * be nullptr.
  462. * @param lenReturn getReadBuffer() should update *lenReturn to contain the
  463. * maximum number of bytes that may be written to the read
  464. * buffer. This parameter will never be nullptr.
  465. */
  466. virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
  467. /**
  468. * readDataAvailable() will be invoked when data has been successfully read
  469. * into the buffer returned by the last call to getReadBuffer().
  470. *
  471. * The read callback remains installed after readDataAvailable() returns.
  472. * It must be explicitly uninstalled to stop receiving read events.
  473. * getReadBuffer() will be called at least once before each call to
  474. * readDataAvailable(). getReadBuffer() will also be called before any
  475. * call to readEOF().
  476. *
  477. * @param len The number of bytes placed in the buffer.
  478. */
  479. virtual void readDataAvailable(size_t len) noexcept = 0;
  480. /**
  481. * When data becomes available, isBufferMovable() will be invoked to figure
  482. * out which API will be used, readBufferAvailable() or
  483. * readDataAvailable(). If isBufferMovable() returns true, that means
  484. * ReadCallback supports the IOBuf ownership transfer and
  485. * readBufferAvailable() will be used. Otherwise, not.
  486. * By default, isBufferMovable() always return false. If
  487. * readBufferAvailable() is implemented and to be invoked, You should
  488. * overwrite isBufferMovable() and return true in the inherited class.
  489. *
  490. * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
  491. * itself until data becomes available. Compared with the pre/post buffer
  492. * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
  493. * has two advantages. First, this can avoid memcpy. E.g., in
  494. * AsyncSSLSocket, the decrypted data was copied from the openssl internal
  495. * buffer to the readbuf buffer. With the buffer ownership transfer, the
  496. * internal buffer can be directly "moved" to ReadCallback. Second, the
  497. * memory allocation can be more precise. The reason is
  498. * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
  499. * because they have more context about the available data than
  500. * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes
  501. * buffer, but the available data is always 16KB (max OpenSSL record size).
  502. */
  503. virtual bool isBufferMovable() noexcept {
  504. return false;
  505. }
  506. /**
  507. * Suggested buffer size, allocated for read operations,
  508. * if callback is movable and supports folly::IOBuf
  509. */
  510. virtual size_t maxBufferSize() const {
  511. return 64 * 1024; // 64K
  512. }
  513. /**
  514. * readBufferAvailable() will be invoked when data has been successfully
  515. * read.
  516. *
  517. * Note that only either readBufferAvailable() or readDataAvailable() will
  518. * be invoked according to the return value of isBufferMovable(). The timing
  519. * and aftereffect of readBufferAvailable() are the same as
  520. * readDataAvailable()
  521. *
  522. * @param readBuf The unique pointer of read buffer.
  523. */
  524. virtual void readBufferAvailable(
  525. std::unique_ptr<IOBuf> /*readBuf*/) noexcept {}
  526. /**
  527. * readEOF() will be invoked when the transport is closed.
  528. *
  529. * The read callback will be automatically uninstalled immediately before
  530. * readEOF() is invoked.
  531. */
  532. virtual void readEOF() noexcept = 0;
  533. /**
  534. * readError() will be invoked if an error occurs reading from the
  535. * transport.
  536. *
  537. * The read callback will be automatically uninstalled immediately before
  538. * readError() is invoked.
  539. *
  540. * @param ex An exception describing the error that occurred.
  541. */
  542. virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
  543. };
  544. // Read methods that aren't part of AsyncTransport.
  545. virtual void setReadCB(ReadCallback* callback) = 0;
  546. virtual ReadCallback* getReadCallback() const = 0;
  547. protected:
  548. virtual ~AsyncReader() = default;
  549. };
  550. class AsyncWriter {
  551. public:
  552. class WriteCallback {
  553. public:
  554. virtual ~WriteCallback() = default;
  555. /**
  556. * writeSuccess() will be invoked when all of the data has been
  557. * successfully written.
  558. *
  559. * Note that this mainly signals that the buffer containing the data to
  560. * write is no longer needed and may be freed or re-used. It does not
  561. * guarantee that the data has been fully transmitted to the remote
  562. * endpoint. For example, on socket-based transports, writeSuccess() only
  563. * indicates that the data has been given to the kernel for eventual
  564. * transmission.
  565. */
  566. virtual void writeSuccess() noexcept = 0;
  567. /**
  568. * writeError() will be invoked if an error occurs writing the data.
  569. *
  570. * @param bytesWritten The number of bytes that were successfull
  571. * @param ex An exception describing the error that occurred.
  572. */
  573. virtual void writeErr(
  574. size_t bytesWritten,
  575. const AsyncSocketException& ex) noexcept = 0;
  576. };
  577. /**
  578. * If you supply a non-null WriteCallback, exactly one of writeSuccess()
  579. * or writeErr() will be invoked when the write completes. If you supply
  580. * the same WriteCallback object for multiple write() calls, it will be
  581. * invoked exactly once per call. The only way to cancel outstanding
  582. * write requests is to close the socket (e.g., with closeNow() or
  583. * shutdownWriteNow()). When closing the socket this way, writeErr() will
  584. * still be invoked once for each outstanding write operation.
  585. */
  586. virtual void write(
  587. WriteCallback* callback,
  588. const void* buf,
  589. size_t bytes,
  590. WriteFlags flags = WriteFlags::NONE) = 0;
  591. /**
  592. * If you supply a non-null WriteCallback, exactly one of writeSuccess()
  593. * or writeErr() will be invoked when the write completes. If you supply
  594. * the same WriteCallback object for multiple write() calls, it will be
  595. * invoked exactly once per call. The only way to cancel outstanding
  596. * write requests is to close the socket (e.g., with closeNow() or
  597. * shutdownWriteNow()). When closing the socket this way, writeErr() will
  598. * still be invoked once for each outstanding write operation.
  599. */
  600. virtual void writev(
  601. WriteCallback* callback,
  602. const iovec* vec,
  603. size_t count,
  604. WriteFlags flags = WriteFlags::NONE) = 0;
  605. /**
  606. * If you supply a non-null WriteCallback, exactly one of writeSuccess()
  607. * or writeErr() will be invoked when the write completes. If you supply
  608. * the same WriteCallback object for multiple write() calls, it will be
  609. * invoked exactly once per call. The only way to cancel outstanding
  610. * write requests is to close the socket (e.g., with closeNow() or
  611. * shutdownWriteNow()). When closing the socket this way, writeErr() will
  612. * still be invoked once for each outstanding write operation.
  613. */
  614. virtual void writeChain(
  615. WriteCallback* callback,
  616. std::unique_ptr<IOBuf>&& buf,
  617. WriteFlags flags = WriteFlags::NONE) = 0;
  618. protected:
  619. virtual ~AsyncWriter() = default;
  620. };
  621. // Transitional intermediate interface. This is deprecated.
  622. // Wrapper around folly::AsyncTransport, that includes read/write callbacks
  623. class AsyncTransportWrapper : virtual public AsyncTransport,
  624. virtual public AsyncReader,
  625. virtual public AsyncWriter {
  626. public:
  627. using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
  628. // Alias for inherited members from AsyncReader and AsyncWriter
  629. // to keep compatibility.
  630. using ReadCallback = AsyncReader::ReadCallback;
  631. using WriteCallback = AsyncWriter::WriteCallback;
  632. void setReadCB(ReadCallback* callback) override = 0;
  633. ReadCallback* getReadCallback() const override = 0;
  634. void write(
  635. WriteCallback* callback,
  636. const void* buf,
  637. size_t bytes,
  638. WriteFlags flags = WriteFlags::NONE) override = 0;
  639. void writev(
  640. WriteCallback* callback,
  641. const iovec* vec,
  642. size_t count,
  643. WriteFlags flags = WriteFlags::NONE) override = 0;
  644. void writeChain(
  645. WriteCallback* callback,
  646. std::unique_ptr<IOBuf>&& buf,
  647. WriteFlags flags = WriteFlags::NONE) override = 0;
  648. /**
  649. * The transport wrapper may wrap another transport. This returns the
  650. * transport that is wrapped. It returns nullptr if there is no wrapped
  651. * transport.
  652. */
  653. virtual const AsyncTransportWrapper* getWrappedTransport() const {
  654. return nullptr;
  655. }
  656. /**
  657. * In many cases when we need to set socket properties or otherwise access the
  658. * underlying transport from a wrapped transport. This method allows access to
  659. * the derived classes of the underlying transport.
  660. */
  661. template <class T>
  662. const T* getUnderlyingTransport() const {
  663. const AsyncTransportWrapper* current = this;
  664. while (current) {
  665. auto sock = dynamic_cast<const T*>(current);
  666. if (sock) {
  667. return sock;
  668. }
  669. current = current->getWrappedTransport();
  670. }
  671. return nullptr;
  672. }
  673. template <class T>
  674. T* getUnderlyingTransport() {
  675. return const_cast<T*>(static_cast<const AsyncTransportWrapper*>(this)
  676. ->getUnderlyingTransport<T>());
  677. }
  678. };
  679. } // namespace folly