from.h 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #pragma once
  2. /*
  3. * Copyright 2018-present Facebook, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #include <folly/experimental/pushmi/flow_many_sender.h>
  18. #include <folly/experimental/pushmi/many_sender.h>
  19. #include <folly/experimental/pushmi/o/extension_operators.h>
  20. #include <folly/experimental/pushmi/o/submit.h>
  21. #include <folly/experimental/pushmi/trampoline.h>
  22. namespace pushmi {
  23. PUSHMI_CONCEPT_DEF(
  24. template(class R)
  25. concept Range,
  26. requires(R&& r)(
  27. implicitly_convertible_to<bool>(std::begin(r) == std::end(r)))
  28. );
  29. namespace operators {
  30. PUSHMI_INLINE_VAR constexpr struct from_fn {
  31. private:
  32. struct sender_base : many_sender<ignoreSF, inlineEXF> {
  33. using properties = property_set<
  34. is_sender<>,
  35. is_many<>,
  36. is_always_blocking<>,
  37. is_fifo_sequence<>>;
  38. };
  39. template <class I, class S>
  40. struct out_impl {
  41. I begin_;
  42. S end_;
  43. PUSHMI_TEMPLATE(class Out)
  44. (requires ReceiveValue<
  45. Out,
  46. typename std::iterator_traits<I>::value_type>)
  47. void operator()(sender_base&, Out out) const {
  48. auto c = begin_;
  49. for (; c != end_; ++c) {
  50. ::pushmi::set_value(out, *c);
  51. }
  52. ::pushmi::set_done(out);
  53. }
  54. };
  55. public:
  56. PUSHMI_TEMPLATE(class I, class S)
  57. (requires DerivedFrom<
  58. typename std::iterator_traits<I>::iterator_category,
  59. std::forward_iterator_tag>)
  60. auto operator()(I begin, S end) const {
  61. return make_many_sender(sender_base{}, out_impl<I, S>{begin, end});
  62. }
  63. PUSHMI_TEMPLATE(class R)
  64. (requires Range<R>)
  65. auto operator()(R&& range) const {
  66. return (*this)(std::begin(range), std::end(range));
  67. }
  68. } from{};
  69. template <class I, class S, class Out, class Exec>
  70. struct flow_from_producer {
  71. flow_from_producer(I begin, S end, Out out, Exec exec, bool s)
  72. : c(begin),
  73. end(end),
  74. out(std::move(out)),
  75. exec(std::move(exec)),
  76. stop(s) {}
  77. I c;
  78. S end;
  79. Out out;
  80. Exec exec;
  81. std::atomic<bool> stop;
  82. };
  83. template <class Producer>
  84. struct flow_from_up {
  85. using properties = properties_t<receiver<>>;
  86. explicit flow_from_up(std::shared_ptr<Producer> p) : p(std::move(p)) {}
  87. std::shared_ptr<Producer> p;
  88. void value(std::ptrdiff_t requested) {
  89. if (requested < 1) {
  90. return;
  91. }
  92. // submit work to exec
  93. ::pushmi::submit(p->exec, make_receiver([p = p, requested](auto) {
  94. auto remaining = requested;
  95. // this loop is structured to work when there is
  96. // re-entrancy out.value in the loop may call up.value.
  97. // to handle this the state of p->c must be captured and
  98. // the remaining and p->c must be changed before
  99. // out.value is called.
  100. while (remaining-- > 0 && !p->stop && p->c != p->end) {
  101. auto i = (p->c)++;
  102. ::pushmi::set_value(
  103. p->out, ::pushmi::detail::as_const(*i));
  104. }
  105. if (p->c == p->end) {
  106. ::pushmi::set_done(p->out);
  107. }
  108. }));
  109. }
  110. template <class E>
  111. void error(E) noexcept {
  112. p->stop.store(true);
  113. ::pushmi::submit(
  114. p->exec, make_receiver([p = p](auto) { ::pushmi::set_done(p->out); }));
  115. }
  116. void done() {
  117. p->stop.store(true);
  118. ::pushmi::submit(
  119. p->exec, make_receiver([p = p](auto) { ::pushmi::set_done(p->out); }));
  120. }
  121. };
  122. PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
  123. private:
  124. template <class I, class S, class Exec>
  125. struct out_impl {
  126. I begin_;
  127. S end_;
  128. mutable Exec exec_;
  129. PUSHMI_TEMPLATE(class Out)
  130. (requires ReceiveValue<
  131. Out,
  132. typename std::iterator_traits<I>::value_type>)
  133. void operator()(Out out) const {
  134. using Producer = flow_from_producer<I, S, Out, Exec>;
  135. auto p = std::make_shared<Producer>(
  136. begin_, end_, std::move(out), exec_, false);
  137. ::pushmi::submit(exec_, make_receiver([p](auto) {
  138. // pass reference for cancellation.
  139. ::pushmi::set_starting(
  140. p->out, make_receiver(flow_from_up<Producer>{p}));
  141. }));
  142. }
  143. };
  144. public:
  145. PUSHMI_TEMPLATE(class I, class S)
  146. (requires DerivedFrom<
  147. typename std::iterator_traits<I>::iterator_category,
  148. std::forward_iterator_tag>)
  149. auto operator()(I begin, S end) const {
  150. return (*this)(begin, end, trampoline());
  151. }
  152. PUSHMI_TEMPLATE(class R)
  153. (requires Range<R>)
  154. auto operator()(R&& range) const {
  155. return (*this)(std::begin(range), std::end(range), trampoline());
  156. }
  157. PUSHMI_TEMPLATE(class I, class S, class Exec)
  158. (requires DerivedFrom<
  159. typename std::iterator_traits<I>::iterator_category,
  160. std::forward_iterator_tag>&& Sender<Exec, is_single<>, is_executor<>>)
  161. auto operator()(I begin, S end, Exec exec) const {
  162. return make_flow_many_sender(out_impl<I, S, Exec>{begin, end, exec});
  163. }
  164. PUSHMI_TEMPLATE(class R, class Exec)
  165. (requires Range<R>&& Sender<Exec, is_single<>, is_executor<>>)
  166. auto operator()(
  167. R&& range,
  168. Exec exec) const {
  169. return (*this)(std::begin(range), std::end(range), exec);
  170. }
  171. } flow_from{};
  172. } // namespace operators
  173. } // namespace pushmi