123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- #pragma once
- /*
- * Copyright 2018-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <folly/experimental/pushmi/executor.h>
- #include <algorithm>
- #include <chrono>
- #include <deque>
- #include <thread>
- namespace pushmi {
- struct recurse_t {};
- constexpr const recurse_t recurse{};
- struct _pipeable_sender_ {};
- namespace detail {
- PUSHMI_INLINE_VAR constexpr struct ownordelegate_t {
- } const ownordelegate{};
- PUSHMI_INLINE_VAR constexpr struct ownornest_t {
- } const ownornest{};
- class trampoline_id {
- std::thread::id threadid;
- uintptr_t trampolineid;
- public:
- template <class T>
- explicit trampoline_id(T* trampoline)
- : threadid(std::this_thread::get_id()), trampolineid(trampoline) {}
- };
- template <class E = std::exception_ptr>
- class trampoline;
- template <class E = std::exception_ptr>
- class delegator : _pipeable_sender_ {
- public:
- using properties = property_set<
- is_sender<>,
- is_executor<>,
- is_maybe_blocking<>,
- is_fifo_sequence<>,
- is_single<>>;
- delegator executor() {
- return {};
- }
- PUSHMI_TEMPLATE(class SingleReceiver)
- (requires ReceiveValue<
- remove_cvref_t<SingleReceiver>,
- any_executor_ref<E>>)
- void submit(SingleReceiver&& what) {
- trampoline<E>::submit(ownordelegate, std::forward<SingleReceiver>(what));
- }
- };
- template <class E = std::exception_ptr>
- class nester : _pipeable_sender_ {
- public:
- using properties = property_set<
- is_sender<>,
- is_executor<>,
- is_maybe_blocking<>,
- is_fifo_sequence<>,
- is_single<>>;
- nester executor() {
- return {};
- }
- PUSHMI_TEMPLATE(class SingleReceiver)
- (requires ReceiveValue<
- remove_cvref_t<SingleReceiver>,
- any_executor_ref<E>>)
- void submit(SingleReceiver&& what) {
- trampoline<E>::submit(ownornest, std::forward<SingleReceiver>(what));
- }
- };
- template <class E>
- class trampoline {
- private:
- using error_type = std::decay_t<E>;
- using work_type = any_receiver<error_type, any_executor_ref<error_type>>;
- using queue_type = std::deque<work_type>;
- using pending_type = std::tuple<int, queue_type, bool>;
- inline static pending_type*& owner() {
- static thread_local pending_type* pending = nullptr;
- return pending;
- }
- inline static int& depth(pending_type& p) {
- return std::get<0>(p);
- }
- inline static queue_type& pending(pending_type& p) {
- return std::get<1>(p);
- }
- inline static bool& repeat(pending_type& p) {
- return std::get<2>(p);
- }
- public:
- inline static trampoline_id get_id() {
- return {owner()};
- }
- inline static bool is_owned() {
- return owner() != nullptr;
- }
- template <class Selector, class Derived>
- static void submit(Selector, Derived&, recurse_t) {
- if (!is_owned()) {
- abort();
- }
- repeat(*owner()) = true;
- }
- PUSHMI_TEMPLATE(class SingleReceiver)
- (requires not Same<SingleReceiver, recurse_t>)
- static void submit(
- ownordelegate_t,
- SingleReceiver awhat) {
- delegator<E> that;
- if (is_owned()) {
- // thread already owned
- // poor mans scope guard
- try {
- if (++depth(*owner()) > 100) {
- // defer work to owner
- pending(*owner()).push_back(work_type{std::move(awhat)});
- } else {
- // dynamic recursion - optimization to balance queueing and
- // stack usage and value interleaving on the same thread.
- ::pushmi::set_value(awhat, that);
- ::pushmi::set_done(awhat);
- }
- } catch (...) {
- --depth(*owner());
- throw;
- }
- --depth(*owner());
- return;
- }
- // take over the thread
- pending_type pending_store;
- owner() = &pending_store;
- depth(pending_store) = 0;
- repeat(pending_store) = false;
- // poor mans scope guard
- try {
- trampoline<E>::submit(ownornest, std::move(awhat));
- } catch (...) {
- // ignore exceptions while delivering the exception
- try {
- ::pushmi::set_error(awhat, std::current_exception());
- for (auto& what : pending(pending_store)) {
- ::pushmi::set_error(what, std::current_exception());
- }
- } catch (...) {
- }
- pending(pending_store).clear();
- if (!is_owned()) {
- std::abort();
- }
- if (!pending(pending_store).empty()) {
- std::abort();
- }
- owner() = nullptr;
- throw;
- }
- if (!is_owned()) {
- std::abort();
- }
- if (!pending(pending_store).empty()) {
- std::abort();
- }
- owner() = nullptr;
- }
- PUSHMI_TEMPLATE(class SingleReceiver)
- (requires not Same<SingleReceiver, recurse_t>)
- static void submit(
- ownornest_t,
- SingleReceiver awhat) {
- delegator<E> that;
- if (!is_owned()) {
- trampoline<E>::submit(ownordelegate, std::move(awhat));
- return;
- }
- auto& pending_store = *owner();
- // static recursion - tail call optimization
- if (pending(pending_store).empty()) {
- bool go = true;
- while (go) {
- repeat(pending_store) = false;
- ::pushmi::set_value(awhat, that);
- ::pushmi::set_done(awhat);
- go = repeat(pending_store);
- }
- } else {
- pending(pending_store).push_back(work_type{std::move(awhat)});
- }
- if (pending(pending_store).empty()) {
- return;
- }
- while (!pending(pending_store).empty()) {
- auto what = std::move(pending(pending_store).front());
- pending(pending_store).pop_front();
- ::pushmi::set_value(what, any_executor_ref<error_type>{that});
- ::pushmi::set_done(what);
- }
- }
- };
- } // namespace detail
- template <class E = std::exception_ptr>
- detail::trampoline_id get_trampoline_id() {
- if (!detail::trampoline<E>::is_owned()) {
- std::abort();
- }
- return detail::trampoline<E>::get_id();
- }
- template <class E = std::exception_ptr>
- bool owned_by_trampoline() {
- return detail::trampoline<E>::is_owned();
- }
- template <class E = std::exception_ptr>
- inline detail::delegator<E> trampoline() {
- return {};
- }
- template <class E = std::exception_ptr>
- inline detail::nester<E> nested_trampoline() {
- return {};
- }
- // see boosters.h
- struct trampolineEXF {
- auto operator()() {
- return trampoline();
- }
- };
- namespace detail {
- PUSHMI_TEMPLATE(class E)
- (requires SenderTo<delegator<E>, recurse_t>)
- decltype(auto) repeat(delegator<E>& exec) {
- ::pushmi::submit(exec, recurse);
- }
- template <class AnyExec>
- [[noreturn]] void repeat(AnyExec&) {
- std::abort();
- }
- } // namespace detail
- inline auto repeat() {
- return [](auto& exec) { detail::repeat(exec); };
- }
- } // namespace pushmi
|