#pragma once /* * Copyright 2018-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include namespace pushmi { PUSHMI_CONCEPT_DEF( template(class R) concept Range, requires(R&& r)( implicitly_convertible_to(std::begin(r) == std::end(r))) ); namespace operators { PUSHMI_INLINE_VAR constexpr struct from_fn { private: struct sender_base : many_sender { using properties = property_set< is_sender<>, is_many<>, is_always_blocking<>, is_fifo_sequence<>>; }; template struct out_impl { I begin_; S end_; PUSHMI_TEMPLATE(class Out) (requires ReceiveValue< Out, typename std::iterator_traits::value_type>) void operator()(sender_base&, Out out) const { auto c = begin_; for (; c != end_; ++c) { ::pushmi::set_value(out, *c); } ::pushmi::set_done(out); } }; public: PUSHMI_TEMPLATE(class I, class S) (requires DerivedFrom< typename std::iterator_traits::iterator_category, std::forward_iterator_tag>) auto operator()(I begin, S end) const { return make_many_sender(sender_base{}, out_impl{begin, end}); } PUSHMI_TEMPLATE(class R) (requires Range) auto operator()(R&& range) const { return (*this)(std::begin(range), std::end(range)); } } from{}; template struct flow_from_producer { flow_from_producer(I begin, S end, Out out, Exec exec, bool s) : c(begin), end(end), out(std::move(out)), exec(std::move(exec)), stop(s) {} I c; S end; Out out; Exec exec; std::atomic stop; }; template struct flow_from_up { using properties = properties_t>; explicit flow_from_up(std::shared_ptr p) : p(std::move(p)) {} std::shared_ptr p; void value(std::ptrdiff_t requested) { if (requested < 1) { return; } // submit work to exec ::pushmi::submit(p->exec, make_receiver([p = p, requested](auto) { auto remaining = requested; // this loop is structured to work when there is // re-entrancy out.value in the loop may call up.value. // to handle this the state of p->c must be captured and // the remaining and p->c must be changed before // out.value is called. while (remaining-- > 0 && !p->stop && p->c != p->end) { auto i = (p->c)++; ::pushmi::set_value( p->out, ::pushmi::detail::as_const(*i)); } if (p->c == p->end) { ::pushmi::set_done(p->out); } })); } template void error(E) noexcept { p->stop.store(true); ::pushmi::submit( p->exec, make_receiver([p = p](auto) { ::pushmi::set_done(p->out); })); } void done() { p->stop.store(true); ::pushmi::submit( p->exec, make_receiver([p = p](auto) { ::pushmi::set_done(p->out); })); } }; PUSHMI_INLINE_VAR constexpr struct flow_from_fn { private: template struct out_impl { I begin_; S end_; mutable Exec exec_; PUSHMI_TEMPLATE(class Out) (requires ReceiveValue< Out, typename std::iterator_traits::value_type>) void operator()(Out out) const { using Producer = flow_from_producer; auto p = std::make_shared( begin_, end_, std::move(out), exec_, false); ::pushmi::submit(exec_, make_receiver([p](auto) { // pass reference for cancellation. ::pushmi::set_starting( p->out, make_receiver(flow_from_up{p})); })); } }; public: PUSHMI_TEMPLATE(class I, class S) (requires DerivedFrom< typename std::iterator_traits::iterator_category, std::forward_iterator_tag>) auto operator()(I begin, S end) const { return (*this)(begin, end, trampoline()); } PUSHMI_TEMPLATE(class R) (requires Range) auto operator()(R&& range) const { return (*this)(std::begin(range), std::end(range), trampoline()); } PUSHMI_TEMPLATE(class I, class S, class Exec) (requires DerivedFrom< typename std::iterator_traits::iterator_category, std::forward_iterator_tag>&& Sender, is_executor<>>) auto operator()(I begin, S end, Exec exec) const { return make_flow_many_sender(out_impl{begin, end, exec}); } PUSHMI_TEMPLATE(class R, class Exec) (requires Range&& Sender, is_executor<>>) auto operator()( R&& range, Exec exec) const { return (*this)(std::begin(range), std::end(range), exec); } } flow_from{}; } // namespace operators } // namespace pushmi