123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- /*
- * Copyright 2013-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #pragma once
- #include <utility>
- #include <glog/logging.h>
- #include <folly/Portability.h>
- #include <folly/detail/MPMCPipelineDetail.h>
- namespace folly {
- /**
- * Helper tag template to use amplification > 1
- */
- template <class T, size_t Amp>
- class MPMCPipelineStage;
- /**
- * Multi-Producer, Multi-Consumer pipeline.
- *
- * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
- *
- * At each stage, you may dequeue the results from the previous stage (possibly
- * from multiple threads) and enqueue results to the next stage. Regardless of
- * the order of completion, data is delivered to the next stage in the original
- * order. Each input is matched with a "ticket" which must be produced
- * when enqueueing to the next stage.
- *
- * A given stage must produce exactly K ("amplification factor", default K=1)
- * results for every input. This is enforced by requiring that each ticket
- * is used exactly K times.
- *
- * Usage:
- *
- * // arguments are queue sizes
- * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
- *
- * pipeline.blockingWrite(42);
- *
- * {
- * int val;
- * auto ticket = pipeline.blockingReadStage<0>(val);
- * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
- * }
- *
- * {
- * std::string val;
- * auto ticket = pipeline.blockingReadStage<1>(val);
- * int ival = 0;
- * try {
- * ival = folly::to<int>(val);
- * } catch (...) {
- * // We must produce exactly 1 output even on exception!
- * }
- * pipeline.blockingWriteStage<1>(ticket, ival);
- * }
- *
- * int result;
- * pipeline.blockingRead(result);
- * // result == 42
- *
- * To specify amplification factors greater than 1, use
- * MPMCPipelineStage<T, amplification> instead of T in the declaration:
- *
- * MPMCPipeline<int,
- * MPMCPipelineStage<std::string, 2>,
- * MPMCPipelineStage<int, 4>>
- *
- * declares a two-stage pipeline: the first stage produces 2 strings
- * for each input int, the second stage produces 4 ints for each input string,
- * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
- *
- * Implementation details: we use N+1 MPMCQueue objects; each intermediate
- * queue connects two adjacent stages. The MPMCQueue implementation is abused;
- * instead of using it as a queue, we insert in the output queue at the
- * position determined by the input queue's popTicket_. We guarantee that
- * all slots are filled (and therefore the queue doesn't freeze) because
- * we require that each step produces exactly K outputs for every input.
- */
- template <class In, class... Stages>
- class MPMCPipeline {
- typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
- typedef std::tuple<
- detail::MPMCPipelineStageImpl<In>,
- detail::MPMCPipelineStageImpl<
- typename detail::PipelineStageInfo<Stages>::value_type>...>
- StageTuple;
- static constexpr size_t kAmplification =
- detail::AmplificationProduct<StageInfos>::value;
- class TicketBaseDebug {
- public:
- TicketBaseDebug() noexcept : owner_(nullptr), value_(0xdeadbeeffaceb00c) {}
- TicketBaseDebug(TicketBaseDebug&& other) noexcept
- : owner_(std::exchange(other.owner_, nullptr)),
- value_(std::exchange(other.value_, 0xdeadbeeffaceb00c)) {}
- explicit TicketBaseDebug(MPMCPipeline* owner, uint64_t value) noexcept
- : owner_(owner), value_(value) {}
- void check_owner(MPMCPipeline* owner) const {
- CHECK(owner == owner_);
- }
- MPMCPipeline* owner_;
- uint64_t value_;
- };
- class TicketBaseNDebug {
- public:
- TicketBaseNDebug() = default;
- TicketBaseNDebug(TicketBaseNDebug&&) = default;
- explicit TicketBaseNDebug(MPMCPipeline*, uint64_t value) noexcept
- : value_(value) {}
- void check_owner(MPMCPipeline*) const {}
- uint64_t value_;
- };
- using TicketBase =
- std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>;
- public:
- /**
- * Ticket, returned by blockingReadStage, must be given back to
- * blockingWriteStage. Tickets are not thread-safe.
- */
- template <size_t Stage>
- class Ticket : TicketBase {
- public:
- ~Ticket() noexcept {
- CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
- }
- Ticket() noexcept : remainingUses_(0) {}
- Ticket(Ticket&& other) noexcept
- : TicketBase(static_cast<TicketBase&&>(other)),
- remainingUses_(std::exchange(other.remainingUses_, 0)) {}
- Ticket& operator=(Ticket&& other) noexcept {
- if (this != &other) {
- this->~Ticket();
- new (this) Ticket(std::move(other));
- }
- return *this;
- }
- private:
- friend class MPMCPipeline;
- size_t remainingUses_;
- Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
- : TicketBase(owner, value * amplification),
- remainingUses_(amplification) {}
- uint64_t use(MPMCPipeline* owner) {
- CHECK_GT(remainingUses_--, 0);
- TicketBase::check_owner(owner);
- return TicketBase::value_++;
- }
- };
- /**
- * Default-construct pipeline. Useful to move-assign later,
- * just like MPMCQueue, see MPMCQueue.h for more details.
- */
- MPMCPipeline() = default;
- /**
- * Construct a pipeline with N+1 queue sizes.
- */
- template <class... Sizes>
- explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) {}
- /**
- * Push an element into (the first stage of) the pipeline. Blocking.
- */
- template <class... Args>
- void blockingWrite(Args&&... args) {
- std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
- }
- /**
- * Try to push an element into (the first stage of) the pipeline.
- * Non-blocking.
- */
- template <class... Args>
- bool write(Args&&... args) {
- return std::get<0>(stages_).write(std::forward<Args>(args)...);
- }
- /**
- * Read an element for stage Stage and obtain a ticket. Blocking.
- */
- template <size_t Stage>
- Ticket<Stage> blockingReadStage(
- typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
- return Ticket<Stage>(
- this,
- std::tuple_element<Stage, StageInfos>::type::kAmplification,
- std::get<Stage>(stages_).blockingRead(elem));
- }
- /**
- * Try to read an element for stage Stage and obtain a ticket.
- * Non-blocking.
- */
- template <size_t Stage>
- bool readStage(
- Ticket<Stage>& ticket,
- typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
- uint64_t tval;
- if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
- return false;
- }
- ticket = Ticket<Stage>(
- this,
- std::tuple_element<Stage, StageInfos>::type::kAmplification,
- tval);
- return true;
- }
- /**
- * Complete an element in stage Stage (pushing it for stage Stage+1).
- * Blocking.
- */
- template <size_t Stage, class... Args>
- void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
- std::get<Stage + 1>(stages_).blockingWriteWithTicket(
- ticket.use(this), std::forward<Args>(args)...);
- }
- /**
- * Pop an element from (the final stage of) the pipeline. Blocking.
- */
- void blockingRead(typename std::tuple_element<sizeof...(Stages), StageTuple>::
- type::value_type& elem) {
- std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
- }
- /**
- * Try to pop an element from (the final stage of) the pipeline.
- * Non-blocking.
- */
- bool read(typename std::tuple_element<sizeof...(Stages), StageTuple>::type::
- value_type& elem) {
- return std::get<sizeof...(Stages)>(stages_).read(elem);
- }
- /**
- * Estimate queue size, measured as values from the last stage.
- * (so if the pipeline has an amplification factor > 1, pushing an element
- * into the first stage will cause sizeGuess() to be == amplification factor)
- * Elements "in flight" (currently processed as part of a stage, so not
- * in any queue) are also counted.
- */
- ssize_t sizeGuess() const noexcept {
- return ssize_t(
- std::get<0>(stages_).writeCount() * kAmplification -
- std::get<sizeof...(Stages)>(stages_).readCount());
- }
- private:
- StageTuple stages_;
- };
- } // namespace folly
|