123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #pragma once
- /*
- * Copyright 2018-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <folly/experimental/pushmi/flow_many_sender.h>
- #include <folly/experimental/pushmi/many_sender.h>
- #include <folly/experimental/pushmi/o/extension_operators.h>
- #include <folly/experimental/pushmi/o/submit.h>
- #include <folly/experimental/pushmi/trampoline.h>
- namespace pushmi {
- PUSHMI_CONCEPT_DEF(
- template(class R)
- concept Range,
- requires(R&& r)(
- implicitly_convertible_to<bool>(std::begin(r) == std::end(r)))
- );
- namespace operators {
- PUSHMI_INLINE_VAR constexpr struct from_fn {
- private:
- struct sender_base : many_sender<ignoreSF, inlineEXF> {
- using properties = property_set<
- is_sender<>,
- is_many<>,
- is_always_blocking<>,
- is_fifo_sequence<>>;
- };
- template <class I, class S>
- struct out_impl {
- I begin_;
- S end_;
- PUSHMI_TEMPLATE(class Out)
- (requires ReceiveValue<
- Out,
- typename std::iterator_traits<I>::value_type>)
- void operator()(sender_base&, Out out) const {
- auto c = begin_;
- for (; c != end_; ++c) {
- ::pushmi::set_value(out, *c);
- }
- ::pushmi::set_done(out);
- }
- };
- public:
- PUSHMI_TEMPLATE(class I, class S)
- (requires DerivedFrom<
- typename std::iterator_traits<I>::iterator_category,
- std::forward_iterator_tag>)
- auto operator()(I begin, S end) const {
- return make_many_sender(sender_base{}, out_impl<I, S>{begin, end});
- }
- PUSHMI_TEMPLATE(class R)
- (requires Range<R>)
- auto operator()(R&& range) const {
- return (*this)(std::begin(range), std::end(range));
- }
- } from{};
- template <class I, class S, class Out, class Exec>
- struct flow_from_producer {
- flow_from_producer(I begin, S end, Out out, Exec exec, bool s)
- : c(begin),
- end(end),
- out(std::move(out)),
- exec(std::move(exec)),
- stop(s) {}
- I c;
- S end;
- Out out;
- Exec exec;
- std::atomic<bool> stop;
- };
- template <class Producer>
- struct flow_from_up {
- using properties = properties_t<receiver<>>;
- explicit flow_from_up(std::shared_ptr<Producer> p) : p(std::move(p)) {}
- std::shared_ptr<Producer> p;
- void value(std::ptrdiff_t requested) {
- if (requested < 1) {
- return;
- }
- // submit work to exec
- ::pushmi::submit(p->exec, make_receiver([p = p, requested](auto) {
- auto remaining = requested;
- // this loop is structured to work when there is
- // re-entrancy out.value in the loop may call up.value.
- // to handle this the state of p->c must be captured and
- // the remaining and p->c must be changed before
- // out.value is called.
- while (remaining-- > 0 && !p->stop && p->c != p->end) {
- auto i = (p->c)++;
- ::pushmi::set_value(
- p->out, ::pushmi::detail::as_const(*i));
- }
- if (p->c == p->end) {
- ::pushmi::set_done(p->out);
- }
- }));
- }
- template <class E>
- void error(E) noexcept {
- p->stop.store(true);
- ::pushmi::submit(
- p->exec, make_receiver([p = p](auto) { ::pushmi::set_done(p->out); }));
- }
- void done() {
- p->stop.store(true);
- ::pushmi::submit(
- p->exec, make_receiver([p = p](auto) { ::pushmi::set_done(p->out); }));
- }
- };
- PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
- private:
- template <class I, class S, class Exec>
- struct out_impl {
- I begin_;
- S end_;
- mutable Exec exec_;
- PUSHMI_TEMPLATE(class Out)
- (requires ReceiveValue<
- Out,
- typename std::iterator_traits<I>::value_type>)
- void operator()(Out out) const {
- using Producer = flow_from_producer<I, S, Out, Exec>;
- auto p = std::make_shared<Producer>(
- begin_, end_, std::move(out), exec_, false);
- ::pushmi::submit(exec_, make_receiver([p](auto) {
- // pass reference for cancellation.
- ::pushmi::set_starting(
- p->out, make_receiver(flow_from_up<Producer>{p}));
- }));
- }
- };
- public:
- PUSHMI_TEMPLATE(class I, class S)
- (requires DerivedFrom<
- typename std::iterator_traits<I>::iterator_category,
- std::forward_iterator_tag>)
- auto operator()(I begin, S end) const {
- return (*this)(begin, end, trampoline());
- }
- PUSHMI_TEMPLATE(class R)
- (requires Range<R>)
- auto operator()(R&& range) const {
- return (*this)(std::begin(range), std::end(range), trampoline());
- }
- PUSHMI_TEMPLATE(class I, class S, class Exec)
- (requires DerivedFrom<
- typename std::iterator_traits<I>::iterator_category,
- std::forward_iterator_tag>&& Sender<Exec, is_single<>, is_executor<>>)
- auto operator()(I begin, S end, Exec exec) const {
- return make_flow_many_sender(out_impl<I, S, Exec>{begin, end, exec});
- }
- PUSHMI_TEMPLATE(class R, class Exec)
- (requires Range<R>&& Sender<Exec, is_single<>, is_executor<>>)
- auto operator()(
- R&& range,
- Exec exec) const {
- return (*this)(std::begin(range), std::end(range), exec);
- }
- } flow_from{};
- } // namespace operators
- } // namespace pushmi
|