submit.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. #pragma once
  2. /*
  3. * Copyright 2018-present Facebook, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #include <folly/experimental/pushmi/boosters.h>
  18. #include <folly/experimental/pushmi/detail/if_constexpr.h>
  19. #include <folly/experimental/pushmi/detail/opt.h>
  20. #include <folly/experimental/pushmi/o/extension_operators.h>
  21. #include <folly/experimental/pushmi/time_single_sender.h>
  22. #include <folly/experimental/pushmi/trampoline.h>
  23. #include <functional>
  24. namespace pushmi {
  25. namespace detail {
  26. namespace submit_detail {
  27. PUSHMI_CONCEPT_DEF(
  28. template(class In, class... AN)(
  29. concept AutoSenderTo)(In, AN...),
  30. Sender<In>&& SenderTo<In, receiver_type_t<In, AN...>>
  31. );
  32. PUSHMI_CONCEPT_DEF(
  33. template(class In, class... AN)(
  34. concept AutoConstrainedSenderTo)(In, AN...),
  35. ConstrainedSenderTo<In, receiver_type_t<In, AN...>>
  36. );
  37. PUSHMI_CONCEPT_DEF(
  38. template(class In, class... AN)(
  39. concept AutoTimeSenderTo)(In, AN...),
  40. TimeSenderTo<In, receiver_type_t<In, AN...>>
  41. );
  42. } // namespace submit_detail
  43. struct submit_fn {
  44. private:
  45. // TODO - only move, move-only types..
  46. // if out can be copied, then submit can be called multiple
  47. // times..
  48. template <class... AN>
  49. struct fn {
  50. std::tuple<AN...> args_;
  51. PUSHMI_TEMPLATE(class In)
  52. (requires submit_detail::AutoSenderTo<In, AN...>)
  53. In operator()(In in) {
  54. auto out{::pushmi::detail::receiver_from_fn<In>{}(std::move(args_))};
  55. ::pushmi::submit(in, std::move(out));
  56. return in;
  57. }
  58. };
  59. public:
  60. template <class... AN>
  61. auto operator()(AN&&... an) const {
  62. return submit_fn::fn<AN...>{std::tuple<AN...>{(AN &&) an...}};
  63. }
  64. };
  65. struct submit_at_fn {
  66. private:
  67. template <class TP, class... AN>
  68. struct fn {
  69. TP at_;
  70. std::tuple<AN...> args_;
  71. PUSHMI_TEMPLATE(class In)
  72. (requires submit_detail::AutoTimeSenderTo<In, AN...>)
  73. In operator()(In in) {
  74. auto out{::pushmi::detail::receiver_from_fn<In>()(std::move(args_))};
  75. ::pushmi::submit(in, std::move(at_), std::move(out));
  76. return in;
  77. }
  78. };
  79. public:
  80. PUSHMI_TEMPLATE(class TP, class... AN)
  81. (requires Regular<TP>)
  82. auto operator()(TP at, AN... an) const {
  83. return submit_at_fn::fn<TP, AN...>{std::move(at), {(AN &&) an...}};
  84. }
  85. };
  86. struct submit_after_fn {
  87. private:
  88. template <class D, class... AN>
  89. struct fn {
  90. D after_;
  91. std::tuple<AN...> args_;
  92. PUSHMI_TEMPLATE(class In)
  93. (requires submit_detail::AutoTimeSenderTo<In, AN...>)
  94. In operator()(In in) {
  95. // TODO - only move, move-only types..
  96. // if out can be copied, then submit can be called multiple
  97. // times..
  98. auto out{::pushmi::detail::receiver_from_fn<In>()(std::move(args_))};
  99. auto at = ::pushmi::now(in) + std::move(after_);
  100. ::pushmi::submit(in, std::move(at), std::move(out));
  101. return in;
  102. }
  103. };
  104. public:
  105. PUSHMI_TEMPLATE(class D, class... AN)
  106. (requires Regular<D>)
  107. auto operator()(D after, AN... an) const {
  108. return submit_after_fn::fn<D, AN...>{std::move(after), {(AN &&) an...}};
  109. }
  110. };
  111. struct blocking_submit_fn {
  112. private:
  113. struct lock_state {
  114. bool done{false};
  115. std::atomic<int> nested{0};
  116. std::mutex lock;
  117. std::condition_variable signaled;
  118. };
  119. template <class Out>
  120. struct nested_receiver_impl;
  121. PUSHMI_TEMPLATE(class Exec)
  122. (requires Sender<Exec>&& Executor<Exec>)
  123. struct nested_executor_impl {
  124. nested_executor_impl(lock_state* state, Exec ex)
  125. : state_(state), ex_(std::move(ex)) {}
  126. lock_state* state_;
  127. Exec ex_;
  128. template <class U>
  129. using test_for_this = nested_executor_impl<U>;
  130. PUSHMI_TEMPLATE(class Ex)
  131. (requires Sender<Ex>&& Executor<Ex>&&
  132. detail::is_v<Ex, test_for_this>)
  133. static auto make(lock_state*, Ex ex) {
  134. return ex;
  135. }
  136. PUSHMI_TEMPLATE(class Ex)
  137. (requires Sender<Ex>&& Executor<Ex> &&
  138. not detail::is_v<Ex, test_for_this>)
  139. static auto make(lock_state* state, Ex ex) {
  140. return nested_executor_impl<Ex>{state, ex};
  141. }
  142. using properties = properties_t<Exec>;
  143. auto executor() {
  144. return make(state_, ::pushmi::executor(ex_));
  145. }
  146. PUSHMI_TEMPLATE(class... ZN)
  147. (requires Constrained<Exec>)
  148. auto top() {
  149. return ::pushmi::top(ex_);
  150. }
  151. PUSHMI_TEMPLATE(class CV, class Out)
  152. (requires Receiver<Out>&& Constrained<Exec>)
  153. void submit(CV cv, Out out) {
  154. ++state_->nested;
  155. ::pushmi::submit(
  156. ex_, cv, nested_receiver_impl<Out>{state_, std::move(out)});
  157. }
  158. PUSHMI_TEMPLATE(class Out)
  159. (requires Receiver<Out> && not Constrained<Exec>)
  160. void submit(Out out) {
  161. ++state_->nested;
  162. ::pushmi::submit(ex_, nested_receiver_impl<Out>{state_, std::move(out)});
  163. }
  164. };
  165. template <class Out>
  166. struct nested_receiver_impl {
  167. nested_receiver_impl(lock_state* state, Out out)
  168. : state_(state), out_(std::move(out)) {}
  169. lock_state* state_;
  170. Out out_;
  171. using properties = properties_t<Out>;
  172. template <class V>
  173. void value(V&& v) {
  174. std::exception_ptr e;
  175. using executor_t = remove_cvref_t<V>;
  176. auto n = nested_executor_impl<executor_t>::make(state_, (V &&) v);
  177. ::pushmi::set_value(out_, any_executor_ref<>{n});
  178. }
  179. template <class E>
  180. void error(E&& e) noexcept {
  181. ::pushmi::set_error(out_, (E &&) e);
  182. if (--state_->nested == 0) {
  183. state_->signaled.notify_all();
  184. }
  185. }
  186. void done() {
  187. std::exception_ptr e;
  188. try {
  189. ::pushmi::set_done(out_);
  190. } catch (...) {
  191. e = std::current_exception();
  192. }
  193. if (--state_->nested == 0) {
  194. state_->signaled.notify_all();
  195. }
  196. if (e) {
  197. std::rethrow_exception(e);
  198. }
  199. }
  200. };
  201. struct nested_executor_impl_fn {
  202. PUSHMI_TEMPLATE(class Exec)
  203. (requires Executor<Exec>)
  204. auto operator()(lock_state* state, Exec ex) const {
  205. return nested_executor_impl<Exec>::make(state, std::move(ex));
  206. }
  207. };
  208. struct on_value_impl {
  209. lock_state* state_;
  210. PUSHMI_TEMPLATE(class Out, class Value)
  211. (requires Executor<std::decay_t<Value>>&& ReceiveValue<
  212. Out,
  213. pushmi::invoke_result_t<
  214. nested_executor_impl_fn,
  215. lock_state*,
  216. std::decay_t<Value>>>)
  217. void operator()(Out out, Value&& v) const {
  218. ++state_->nested;
  219. ::pushmi::set_value(out, nested_executor_impl_fn{}(state_, (Value &&) v));
  220. if (--state_->nested == 0) {
  221. std::unique_lock<std::mutex> guard{state_->lock};
  222. state_->signaled.notify_all();
  223. }
  224. }
  225. PUSHMI_TEMPLATE(class Out, class... VN)
  226. (requires True<>&& ReceiveValue<Out, VN...> &&
  227. not(sizeof...(VN) == 1 && And<Executor<std::decay_t<VN>>...>))
  228. void operator()(Out out, VN&&... vn) const {
  229. ::pushmi::set_value(out, (VN &&) vn...);
  230. }
  231. };
  232. struct on_error_impl {
  233. lock_state* state_;
  234. PUSHMI_TEMPLATE(class Out, class E)
  235. (requires ReceiveError<Out, E>)
  236. void operator()(Out out, E e) const noexcept {
  237. ::pushmi::set_error(out, std::move(e));
  238. std::unique_lock<std::mutex> guard{state_->lock};
  239. state_->done = true;
  240. state_->signaled.notify_all();
  241. }
  242. };
  243. struct on_done_impl {
  244. lock_state* state_;
  245. PUSHMI_TEMPLATE(class Out)
  246. (requires Receiver<Out>)
  247. void operator()(Out out) const {
  248. ::pushmi::set_done(out);
  249. std::unique_lock<std::mutex> guard{state_->lock};
  250. state_->done = true;
  251. state_->signaled.notify_all();
  252. }
  253. };
  254. template <class In>
  255. struct receiver_impl {
  256. PUSHMI_TEMPLATE(class... AN)
  257. (requires Sender<In>)
  258. auto operator()(
  259. lock_state* state,
  260. std::tuple<AN...> args) const {
  261. return ::pushmi::detail::receiver_from_fn<In>()(
  262. std::move(args),
  263. on_value_impl{state},
  264. on_error_impl{state},
  265. on_done_impl{state});
  266. }
  267. };
  268. template <class In>
  269. struct submit_impl {
  270. PUSHMI_TEMPLATE(class Out)
  271. (requires Receiver<Out>&& SenderTo<In, Out>)
  272. void operator()(In& in, Out out) const {
  273. ::pushmi::submit(in, std::move(out));
  274. }
  275. };
  276. // TODO - only move, move-only types..
  277. // if out can be copied, then submit can be called multiple
  278. // times..
  279. template <class... AN>
  280. struct fn {
  281. std::tuple<AN...> args_;
  282. PUSHMI_TEMPLATE(class In)
  283. (requires Sender<In>&& Invocable<
  284. submit_impl<In>&,
  285. In&,
  286. pushmi::invoke_result_t<
  287. receiver_impl<In>,
  288. lock_state*,
  289. std::tuple<AN...>&&>> &&
  290. not AlwaysBlocking<In>)
  291. In operator()(In in) {
  292. lock_state state{};
  293. auto make = receiver_impl<In>{};
  294. auto submit = submit_impl<In>{};
  295. submit(in, make(&state, std::move(args_)));
  296. std::unique_lock<std::mutex> guard{state.lock};
  297. state.signaled.wait(
  298. guard, [&] { return state.done && state.nested.load() == 0; });
  299. return in;
  300. }
  301. };
  302. public:
  303. template <class... AN>
  304. auto operator()(AN... an) const {
  305. return blocking_submit_fn::fn<AN...>{std::tuple<AN...>{(AN &&) an...}};
  306. }
  307. };
  308. template <class T>
  309. struct get_fn {
  310. private:
  311. struct on_value_impl {
  312. pushmi::detail::opt<T>* result_;
  313. template <class... TN>
  314. void operator()(TN&&... tn) const {
  315. *result_ = T{(TN &&) tn...};
  316. }
  317. };
  318. struct on_error_impl {
  319. pushmi::detail::opt<std::exception_ptr>* ep_;
  320. template <class E>
  321. void operator()(E e) const noexcept {
  322. *ep_ = std::make_exception_ptr(e);
  323. }
  324. void operator()(std::exception_ptr ep) const noexcept {
  325. *ep_ = ep;
  326. }
  327. };
  328. public:
  329. // TODO constrain this better
  330. PUSHMI_TEMPLATE(class In)
  331. (requires Sender<In>)
  332. T operator()(In in) const {
  333. pushmi::detail::opt<T> result_;
  334. pushmi::detail::opt<std::exception_ptr> ep_;
  335. auto out =
  336. ::pushmi::make_receiver(on_value_impl{&result_}, on_error_impl{&ep_});
  337. using Out = decltype(out);
  338. static_assert(
  339. SenderTo<In, Out>,
  340. "'In' does not deliver value compatible with 'T' to 'Out'");
  341. std::conditional_t<AlwaysBlocking<In>, submit_fn, blocking_submit_fn>{}(
  342. std::move(out))(std::move(in));
  343. if (!!ep_) {
  344. std::rethrow_exception(*ep_);
  345. }
  346. return std::move(*result_);
  347. }
  348. };
  349. } // namespace detail
  350. namespace operators {
  351. PUSHMI_INLINE_VAR constexpr detail::submit_fn submit{};
  352. PUSHMI_INLINE_VAR constexpr detail::submit_at_fn submit_at{};
  353. PUSHMI_INLINE_VAR constexpr detail::submit_after_fn submit_after{};
  354. PUSHMI_INLINE_VAR constexpr detail::blocking_submit_fn blocking_submit{};
  355. template <class T>
  356. PUSHMI_INLINE_VAR constexpr detail::get_fn<T> get{};
  357. } // namespace operators
  358. } // namespace pushmi