MPMCPipeline.h 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <utility>
  18. #include <glog/logging.h>
  19. #include <folly/Portability.h>
  20. #include <folly/detail/MPMCPipelineDetail.h>
  21. namespace folly {
  22. /**
  23. * Helper tag template to use amplification > 1
  24. */
  25. template <class T, size_t Amp>
  26. class MPMCPipelineStage;
  27. /**
  28. * Multi-Producer, Multi-Consumer pipeline.
  29. *
  30. * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
  31. *
  32. * At each stage, you may dequeue the results from the previous stage (possibly
  33. * from multiple threads) and enqueue results to the next stage. Regardless of
  34. * the order of completion, data is delivered to the next stage in the original
  35. * order. Each input is matched with a "ticket" which must be produced
  36. * when enqueueing to the next stage.
  37. *
  38. * A given stage must produce exactly K ("amplification factor", default K=1)
  39. * results for every input. This is enforced by requiring that each ticket
  40. * is used exactly K times.
  41. *
  42. * Usage:
  43. *
  44. * // arguments are queue sizes
  45. * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
  46. *
  47. * pipeline.blockingWrite(42);
  48. *
  49. * {
  50. * int val;
  51. * auto ticket = pipeline.blockingReadStage<0>(val);
  52. * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
  53. * }
  54. *
  55. * {
  56. * std::string val;
  57. * auto ticket = pipeline.blockingReadStage<1>(val);
  58. * int ival = 0;
  59. * try {
  60. * ival = folly::to<int>(val);
  61. * } catch (...) {
  62. * // We must produce exactly 1 output even on exception!
  63. * }
  64. * pipeline.blockingWriteStage<1>(ticket, ival);
  65. * }
  66. *
  67. * int result;
  68. * pipeline.blockingRead(result);
  69. * // result == 42
  70. *
  71. * To specify amplification factors greater than 1, use
  72. * MPMCPipelineStage<T, amplification> instead of T in the declaration:
  73. *
  74. * MPMCPipeline<int,
  75. * MPMCPipelineStage<std::string, 2>,
  76. * MPMCPipelineStage<int, 4>>
  77. *
  78. * declares a two-stage pipeline: the first stage produces 2 strings
  79. * for each input int, the second stage produces 4 ints for each input string,
  80. * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
  81. *
  82. * Implementation details: we use N+1 MPMCQueue objects; each intermediate
  83. * queue connects two adjacent stages. The MPMCQueue implementation is abused;
  84. * instead of using it as a queue, we insert in the output queue at the
  85. * position determined by the input queue's popTicket_. We guarantee that
  86. * all slots are filled (and therefore the queue doesn't freeze) because
  87. * we require that each step produces exactly K outputs for every input.
  88. */
  89. template <class In, class... Stages>
  90. class MPMCPipeline {
  91. typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
  92. typedef std::tuple<
  93. detail::MPMCPipelineStageImpl<In>,
  94. detail::MPMCPipelineStageImpl<
  95. typename detail::PipelineStageInfo<Stages>::value_type>...>
  96. StageTuple;
  97. static constexpr size_t kAmplification =
  98. detail::AmplificationProduct<StageInfos>::value;
  99. class TicketBaseDebug {
  100. public:
  101. TicketBaseDebug() noexcept : owner_(nullptr), value_(0xdeadbeeffaceb00c) {}
  102. TicketBaseDebug(TicketBaseDebug&& other) noexcept
  103. : owner_(std::exchange(other.owner_, nullptr)),
  104. value_(std::exchange(other.value_, 0xdeadbeeffaceb00c)) {}
  105. explicit TicketBaseDebug(MPMCPipeline* owner, uint64_t value) noexcept
  106. : owner_(owner), value_(value) {}
  107. void check_owner(MPMCPipeline* owner) const {
  108. CHECK(owner == owner_);
  109. }
  110. MPMCPipeline* owner_;
  111. uint64_t value_;
  112. };
  113. class TicketBaseNDebug {
  114. public:
  115. TicketBaseNDebug() = default;
  116. TicketBaseNDebug(TicketBaseNDebug&&) = default;
  117. explicit TicketBaseNDebug(MPMCPipeline*, uint64_t value) noexcept
  118. : value_(value) {}
  119. void check_owner(MPMCPipeline*) const {}
  120. uint64_t value_;
  121. };
  122. using TicketBase =
  123. std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>;
  124. public:
  125. /**
  126. * Ticket, returned by blockingReadStage, must be given back to
  127. * blockingWriteStage. Tickets are not thread-safe.
  128. */
  129. template <size_t Stage>
  130. class Ticket : TicketBase {
  131. public:
  132. ~Ticket() noexcept {
  133. CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
  134. }
  135. Ticket() noexcept : remainingUses_(0) {}
  136. Ticket(Ticket&& other) noexcept
  137. : TicketBase(static_cast<TicketBase&&>(other)),
  138. remainingUses_(std::exchange(other.remainingUses_, 0)) {}
  139. Ticket& operator=(Ticket&& other) noexcept {
  140. if (this != &other) {
  141. this->~Ticket();
  142. new (this) Ticket(std::move(other));
  143. }
  144. return *this;
  145. }
  146. private:
  147. friend class MPMCPipeline;
  148. size_t remainingUses_;
  149. Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
  150. : TicketBase(owner, value * amplification),
  151. remainingUses_(amplification) {}
  152. uint64_t use(MPMCPipeline* owner) {
  153. CHECK_GT(remainingUses_--, 0);
  154. TicketBase::check_owner(owner);
  155. return TicketBase::value_++;
  156. }
  157. };
  158. /**
  159. * Default-construct pipeline. Useful to move-assign later,
  160. * just like MPMCQueue, see MPMCQueue.h for more details.
  161. */
  162. MPMCPipeline() = default;
  163. /**
  164. * Construct a pipeline with N+1 queue sizes.
  165. */
  166. template <class... Sizes>
  167. explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) {}
  168. /**
  169. * Push an element into (the first stage of) the pipeline. Blocking.
  170. */
  171. template <class... Args>
  172. void blockingWrite(Args&&... args) {
  173. std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
  174. }
  175. /**
  176. * Try to push an element into (the first stage of) the pipeline.
  177. * Non-blocking.
  178. */
  179. template <class... Args>
  180. bool write(Args&&... args) {
  181. return std::get<0>(stages_).write(std::forward<Args>(args)...);
  182. }
  183. /**
  184. * Read an element for stage Stage and obtain a ticket. Blocking.
  185. */
  186. template <size_t Stage>
  187. Ticket<Stage> blockingReadStage(
  188. typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
  189. return Ticket<Stage>(
  190. this,
  191. std::tuple_element<Stage, StageInfos>::type::kAmplification,
  192. std::get<Stage>(stages_).blockingRead(elem));
  193. }
  194. /**
  195. * Try to read an element for stage Stage and obtain a ticket.
  196. * Non-blocking.
  197. */
  198. template <size_t Stage>
  199. bool readStage(
  200. Ticket<Stage>& ticket,
  201. typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
  202. uint64_t tval;
  203. if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
  204. return false;
  205. }
  206. ticket = Ticket<Stage>(
  207. this,
  208. std::tuple_element<Stage, StageInfos>::type::kAmplification,
  209. tval);
  210. return true;
  211. }
  212. /**
  213. * Complete an element in stage Stage (pushing it for stage Stage+1).
  214. * Blocking.
  215. */
  216. template <size_t Stage, class... Args>
  217. void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
  218. std::get<Stage + 1>(stages_).blockingWriteWithTicket(
  219. ticket.use(this), std::forward<Args>(args)...);
  220. }
  221. /**
  222. * Pop an element from (the final stage of) the pipeline. Blocking.
  223. */
  224. void blockingRead(typename std::tuple_element<sizeof...(Stages), StageTuple>::
  225. type::value_type& elem) {
  226. std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
  227. }
  228. /**
  229. * Try to pop an element from (the final stage of) the pipeline.
  230. * Non-blocking.
  231. */
  232. bool read(typename std::tuple_element<sizeof...(Stages), StageTuple>::type::
  233. value_type& elem) {
  234. return std::get<sizeof...(Stages)>(stages_).read(elem);
  235. }
  236. /**
  237. * Estimate queue size, measured as values from the last stage.
  238. * (so if the pipeline has an amplification factor > 1, pushing an element
  239. * into the first stage will cause sizeGuess() to be == amplification factor)
  240. * Elements "in flight" (currently processed as part of a stage, so not
  241. * in any queue) are also counted.
  242. */
  243. ssize_t sizeGuess() const noexcept {
  244. return ssize_t(
  245. std::get<0>(stages_).writeCount() * kAmplification -
  246. std::get<sizeof...(Stages)>(stages_).readCount());
  247. }
  248. private:
  249. StageTuple stages_;
  250. };
  251. } // namespace folly