trampoline.h 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. #pragma once
  2. /*
  3. * Copyright 2018-present Facebook, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #include <folly/experimental/pushmi/executor.h>
  18. #include <algorithm>
  19. #include <chrono>
  20. #include <deque>
  21. #include <thread>
  22. namespace pushmi {
  23. struct recurse_t {};
  24. constexpr const recurse_t recurse{};
  25. struct _pipeable_sender_ {};
  26. namespace detail {
  27. PUSHMI_INLINE_VAR constexpr struct ownordelegate_t {
  28. } const ownordelegate{};
  29. PUSHMI_INLINE_VAR constexpr struct ownornest_t {
  30. } const ownornest{};
  31. class trampoline_id {
  32. std::thread::id threadid;
  33. uintptr_t trampolineid;
  34. public:
  35. template <class T>
  36. explicit trampoline_id(T* trampoline)
  37. : threadid(std::this_thread::get_id()), trampolineid(trampoline) {}
  38. };
  39. template <class E = std::exception_ptr>
  40. class trampoline;
  41. template <class E = std::exception_ptr>
  42. class delegator : _pipeable_sender_ {
  43. public:
  44. using properties = property_set<
  45. is_sender<>,
  46. is_executor<>,
  47. is_maybe_blocking<>,
  48. is_fifo_sequence<>,
  49. is_single<>>;
  50. delegator executor() {
  51. return {};
  52. }
  53. PUSHMI_TEMPLATE(class SingleReceiver)
  54. (requires ReceiveValue<
  55. remove_cvref_t<SingleReceiver>,
  56. any_executor_ref<E>>)
  57. void submit(SingleReceiver&& what) {
  58. trampoline<E>::submit(ownordelegate, std::forward<SingleReceiver>(what));
  59. }
  60. };
  61. template <class E = std::exception_ptr>
  62. class nester : _pipeable_sender_ {
  63. public:
  64. using properties = property_set<
  65. is_sender<>,
  66. is_executor<>,
  67. is_maybe_blocking<>,
  68. is_fifo_sequence<>,
  69. is_single<>>;
  70. nester executor() {
  71. return {};
  72. }
  73. PUSHMI_TEMPLATE(class SingleReceiver)
  74. (requires ReceiveValue<
  75. remove_cvref_t<SingleReceiver>,
  76. any_executor_ref<E>>)
  77. void submit(SingleReceiver&& what) {
  78. trampoline<E>::submit(ownornest, std::forward<SingleReceiver>(what));
  79. }
  80. };
  81. template <class E>
  82. class trampoline {
  83. private:
  84. using error_type = std::decay_t<E>;
  85. using work_type = any_receiver<error_type, any_executor_ref<error_type>>;
  86. using queue_type = std::deque<work_type>;
  87. using pending_type = std::tuple<int, queue_type, bool>;
  88. inline static pending_type*& owner() {
  89. static thread_local pending_type* pending = nullptr;
  90. return pending;
  91. }
  92. inline static int& depth(pending_type& p) {
  93. return std::get<0>(p);
  94. }
  95. inline static queue_type& pending(pending_type& p) {
  96. return std::get<1>(p);
  97. }
  98. inline static bool& repeat(pending_type& p) {
  99. return std::get<2>(p);
  100. }
  101. public:
  102. inline static trampoline_id get_id() {
  103. return {owner()};
  104. }
  105. inline static bool is_owned() {
  106. return owner() != nullptr;
  107. }
  108. template <class Selector, class Derived>
  109. static void submit(Selector, Derived&, recurse_t) {
  110. if (!is_owned()) {
  111. abort();
  112. }
  113. repeat(*owner()) = true;
  114. }
  115. PUSHMI_TEMPLATE(class SingleReceiver)
  116. (requires not Same<SingleReceiver, recurse_t>)
  117. static void submit(
  118. ownordelegate_t,
  119. SingleReceiver awhat) {
  120. delegator<E> that;
  121. if (is_owned()) {
  122. // thread already owned
  123. // poor mans scope guard
  124. try {
  125. if (++depth(*owner()) > 100) {
  126. // defer work to owner
  127. pending(*owner()).push_back(work_type{std::move(awhat)});
  128. } else {
  129. // dynamic recursion - optimization to balance queueing and
  130. // stack usage and value interleaving on the same thread.
  131. ::pushmi::set_value(awhat, that);
  132. ::pushmi::set_done(awhat);
  133. }
  134. } catch (...) {
  135. --depth(*owner());
  136. throw;
  137. }
  138. --depth(*owner());
  139. return;
  140. }
  141. // take over the thread
  142. pending_type pending_store;
  143. owner() = &pending_store;
  144. depth(pending_store) = 0;
  145. repeat(pending_store) = false;
  146. // poor mans scope guard
  147. try {
  148. trampoline<E>::submit(ownornest, std::move(awhat));
  149. } catch (...) {
  150. // ignore exceptions while delivering the exception
  151. try {
  152. ::pushmi::set_error(awhat, std::current_exception());
  153. for (auto& what : pending(pending_store)) {
  154. ::pushmi::set_error(what, std::current_exception());
  155. }
  156. } catch (...) {
  157. }
  158. pending(pending_store).clear();
  159. if (!is_owned()) {
  160. std::abort();
  161. }
  162. if (!pending(pending_store).empty()) {
  163. std::abort();
  164. }
  165. owner() = nullptr;
  166. throw;
  167. }
  168. if (!is_owned()) {
  169. std::abort();
  170. }
  171. if (!pending(pending_store).empty()) {
  172. std::abort();
  173. }
  174. owner() = nullptr;
  175. }
  176. PUSHMI_TEMPLATE(class SingleReceiver)
  177. (requires not Same<SingleReceiver, recurse_t>)
  178. static void submit(
  179. ownornest_t,
  180. SingleReceiver awhat) {
  181. delegator<E> that;
  182. if (!is_owned()) {
  183. trampoline<E>::submit(ownordelegate, std::move(awhat));
  184. return;
  185. }
  186. auto& pending_store = *owner();
  187. // static recursion - tail call optimization
  188. if (pending(pending_store).empty()) {
  189. bool go = true;
  190. while (go) {
  191. repeat(pending_store) = false;
  192. ::pushmi::set_value(awhat, that);
  193. ::pushmi::set_done(awhat);
  194. go = repeat(pending_store);
  195. }
  196. } else {
  197. pending(pending_store).push_back(work_type{std::move(awhat)});
  198. }
  199. if (pending(pending_store).empty()) {
  200. return;
  201. }
  202. while (!pending(pending_store).empty()) {
  203. auto what = std::move(pending(pending_store).front());
  204. pending(pending_store).pop_front();
  205. ::pushmi::set_value(what, any_executor_ref<error_type>{that});
  206. ::pushmi::set_done(what);
  207. }
  208. }
  209. };
  210. } // namespace detail
  211. template <class E = std::exception_ptr>
  212. detail::trampoline_id get_trampoline_id() {
  213. if (!detail::trampoline<E>::is_owned()) {
  214. std::abort();
  215. }
  216. return detail::trampoline<E>::get_id();
  217. }
  218. template <class E = std::exception_ptr>
  219. bool owned_by_trampoline() {
  220. return detail::trampoline<E>::is_owned();
  221. }
  222. template <class E = std::exception_ptr>
  223. inline detail::delegator<E> trampoline() {
  224. return {};
  225. }
  226. template <class E = std::exception_ptr>
  227. inline detail::nester<E> nested_trampoline() {
  228. return {};
  229. }
  230. // see boosters.h
  231. struct trampolineEXF {
  232. auto operator()() {
  233. return trampoline();
  234. }
  235. };
  236. namespace detail {
  237. PUSHMI_TEMPLATE(class E)
  238. (requires SenderTo<delegator<E>, recurse_t>)
  239. decltype(auto) repeat(delegator<E>& exec) {
  240. ::pushmi::submit(exec, recurse);
  241. }
  242. template <class AnyExec>
  243. [[noreturn]] void repeat(AnyExec&) {
  244. std::abort();
  245. }
  246. } // namespace detail
  247. inline auto repeat() {
  248. return [](auto& exec) { detail::repeat(exec); };
  249. }
  250. } // namespace pushmi