123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- /*
- * Copyright 2018-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <vector>
- #include <folly/experimental/pushmi/o/defer.h>
- #include <folly/experimental/pushmi/o/for_each.h>
- #include <folly/experimental/pushmi/o/from.h>
- #include <folly/experimental/pushmi/o/just.h>
- #include <folly/experimental/pushmi/o/on.h>
- #include <folly/experimental/pushmi/o/submit.h>
- #include <folly/experimental/pushmi/o/tap.h>
- #include <folly/experimental/pushmi/o/transform.h>
- #include <folly/experimental/pushmi/o/via.h>
- #include <folly/experimental/pushmi/new_thread.h>
- #include <folly/experimental/pushmi/time_source.h>
- #include <folly/experimental/pushmi/trampoline.h>
- #include <folly/experimental/pushmi/entangle.h>
- #include <folly/experimental/pushmi/receiver.h>
- #include <folly/experimental/pushmi/pool.h>
- using namespace pushmi::aliases;
- template <class R>
- struct countdown {
- explicit countdown(std::atomic<int>& c) : counter(&c) {}
- using properties = mi::properties_t<decltype(R{}())>;
- std::atomic<int>* counter;
- template <class ExecutorRef>
- void value(ExecutorRef exec);
- template <class E>
- void error(E e) {
- std::abort();
- }
- void done() {}
- PUSHMI_TEMPLATE(class Up)
- (requires mi::Invocable<
- decltype(mi::set_value),
- Up,
- std::ptrdiff_t>)void starting(Up up) {
- mi::set_value(up, 1);
- }
- PUSHMI_TEMPLATE(class Up)
- (requires mi::True<>&& mi::
- Invocable<decltype(mi::set_value), Up>)void starting(Up up) volatile {
- mi::set_value(up);
- }
- };
- template <class R>
- template <class ExecutorRef>
- void countdown<R>::value(ExecutorRef exec) {
- if (--*counter >= 0) {
- exec | op::submit(R{}(*this));
- }
- }
- using countdownsingle = countdown<decltype(mi::make_receiver)>;
- using countdownflowsingle = countdown<decltype(mi::make_flow_receiver)>;
- using countdownmany = countdown<decltype(mi::make_receiver)>;
- using countdownflowmany = countdown<decltype(mi::make_flow_receiver)>;
- struct inline_time_executor {
- using properties = mi::property_set<
- mi::is_time<>,
- mi::is_executor<>,
- mi::is_fifo_sequence<>,
- mi::is_always_blocking<>,
- mi::is_single<>>;
- std::chrono::system_clock::time_point top() {
- return std::chrono::system_clock::now();
- }
- auto executor() {
- return *this;
- }
- template <class Out>
- void submit(std::chrono::system_clock::time_point at, Out out) {
- std::this_thread::sleep_until(at);
- ::mi::set_value(out, *this);
- }
- };
- struct inline_executor {
- using properties = mi::property_set<
- mi::is_sender<>,
- mi::is_fifo_sequence<>,
- mi::is_always_blocking<>,
- mi::is_single<>>;
- auto executor() {
- return inline_time_executor{};
- }
- template <class Out>
- void submit(Out out) {
- ::mi::set_value(out, *this);
- }
- };
- template <class CancellationFactory>
- struct inline_executor_flow_single {
- CancellationFactory cf;
- using properties = mi::property_set<
- mi::is_sender<>,
- mi::is_flow<>,
- mi::is_fifo_sequence<>,
- mi::is_maybe_blocking<>,
- mi::is_single<>>;
- auto executor() {
- return inline_time_executor{};
- }
- template <class Out>
- void submit(Out out) {
- auto tokens = cf();
- using Stopper = decltype(tokens.second);
- struct Data : mi::receiver<> {
- explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
- Stopper stopper;
- };
- auto up = mi::MAKE(receiver)(
- Data{std::move(tokens.second)},
- [](auto& data) {},
- [](auto& data, auto e) noexcept {
- auto both = lock_both(data.stopper);
- (*(both.first))(both.second);
- },
- [](auto& data) {
- auto both = lock_both(data.stopper);
- (*(both.first))(both.second);
- });
- // pass reference for cancellation.
- ::mi::set_starting(out, std::move(up));
- auto both = lock_both(tokens.first);
- if (!!both.first && !*(both.first)) {
- ::mi::set_value(out, *this);
- } else {
- // cancellation is not an error
- ::mi::set_done(out);
- }
- }
- };
- struct shared_cancellation_factory {
- auto operator()() {
- // boolean cancellation
- bool stop = false;
- auto set_stop = [](auto& stop) {
- if (!!stop) {
- *stop = true;
- }
- };
- return mi::shared_entangle(stop, set_stop);
- }
- };
- using inline_executor_flow_single_shared =
- inline_executor_flow_single<shared_cancellation_factory>;
- struct entangled_cancellation_factory {
- auto operator()() {
- // boolean cancellation
- bool stop = false;
- auto set_stop = [](auto& stop) {
- if (!!stop) {
- *stop = true;
- }
- };
- return mi::entangle(stop, set_stop);
- }
- };
- using inline_executor_flow_single_entangled =
- inline_executor_flow_single<entangled_cancellation_factory>;
- struct inline_executor_flow_single_ignore {
- using properties = mi::property_set<
- mi::is_sender<>,
- mi::is_flow<>,
- mi::is_fifo_sequence<>,
- mi::is_maybe_blocking<>,
- mi::is_single<>>;
- auto executor() {
- return inline_time_executor{};
- }
- template <class Out>
- void submit(Out out) {
- // pass reference for cancellation.
- ::mi::set_starting(out, mi::receiver<>{});
- ::mi::set_value(out, *this);
- }
- };
- struct inline_executor_flow_many {
- inline_executor_flow_many() : counter(nullptr) {}
- inline_executor_flow_many(std::atomic<int>& c) : counter(&c) {}
- std::atomic<int>* counter;
- using properties = mi::property_set<
- mi::is_sender<>,
- mi::is_flow<>,
- mi::is_fifo_sequence<>,
- mi::is_maybe_blocking<>,
- mi::is_many<>>;
- auto executor() {
- return inline_time_executor{};
- }
- template <class Out>
- void submit(Out out) {
- // boolean cancellation
- struct producer {
- producer(Out out, bool s) : out(std::move(out)), stop(s) {}
- Out out;
- std::atomic<bool> stop;
- };
- auto p = std::make_shared<producer>(std::move(out), false);
- struct Data : mi::receiver<> {
- explicit Data(std::shared_ptr<producer> p) : p(std::move(p)) {}
- std::shared_ptr<producer> p;
- };
- auto up = mi::MAKE(receiver)(
- Data{p},
- [counter = this->counter](auto& data, auto requested) {
- if (requested < 1) {
- return;
- }
- // this is re-entrant
- while (!data.p->stop && --requested >= 0 &&
- (!counter || --*counter >= 0)) {
- ::mi::set_value(
- data.p->out,
- !!counter ? inline_executor_flow_many{*counter}
- : inline_executor_flow_many{});
- }
- if (!counter || *counter == 0) {
- ::mi::set_done(data.p->out);
- }
- },
- [](auto& data, auto e) noexcept {
- data.p->stop.store(true);
- ::mi::set_done(data.p->out);
- },
- [](auto& data) {
- data.p->stop.store(true);
- ::mi::set_done(data.p->out);
- });
- // pass reference for cancellation.
- ::mi::set_starting(p->out, std::move(up));
- }
- };
- struct inline_executor_flow_many_ignore {
- using properties = mi::property_set<
- mi::is_sender<>,
- mi::is_flow<>,
- mi::is_fifo_sequence<>,
- mi::is_always_blocking<>,
- mi::is_many<>>;
- auto executor() {
- return inline_time_executor{};
- }
- template <class Out>
- void submit(Out out) {
- // pass reference for cancellation.
- ::mi::set_starting(out, mi::receiver<>{});
- ::mi::set_value(out, *this);
- ::mi::set_done(out);
- }
- };
- struct inline_executor_many {
- using properties = mi::property_set<
- mi::is_sender<>,
- mi::is_fifo_sequence<>,
- mi::is_always_blocking<>,
- mi::is_many<>>;
- auto executor() {
- return inline_time_executor{};
- }
- template <class Out>
- void submit(Out out) {
- ::mi::set_value(out, *this);
- ::mi::set_done(out);
- }
- };
- #define concept Concept
- #include <nonius/nonius.h++>
- NONIUS_BENCHMARK("ready 1'000 single get (submit)", [](nonius::chronometer meter){
- int counter{0};
- meter.measure([&]{
- counter = 1'000;
- while (--counter >=0) {
- auto fortyTwo = op::just(42) | op::get<int>;
- }
- return counter;
- });
- })
- NONIUS_BENCHMARK("ready 1'000 single get (blocking_submit)", [](nonius::chronometer meter){
- int counter{0};
- meter.measure([&]{
- counter = 1'000;
- while (--counter >=0) {
- auto fortyTwo = mi::make_single_sender([](auto out){ mi::set_value(out, 42); mi::set_done(out);}) | op::get<int>;
- }
- return counter;
- });
- })
- NONIUS_BENCHMARK("inline 1'000 single", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor{};
- using IE = decltype(ie);
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_receiver(single));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 time single", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_time_executor{};
- using IE = decltype(ie);
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_receiver(single));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 many", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_many{};
- using IE = decltype(ie);
- countdownmany many{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_receiver(many));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 flow_single shared", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_single_shared{};
- using IE = decltype(ie);
- countdownflowsingle flowsingle{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_flow_receiver(flowsingle));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 flow_single entangle", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_single_entangled{};
- using IE = decltype(ie);
- countdownflowsingle flowsingle{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_flow_receiver(flowsingle));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 flow_single ignore cancellation", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_single_ignore{};
- using IE = decltype(ie);
- countdownflowsingle flowsingle{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_flow_receiver(flowsingle));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 flow_many", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_many{};
- using IE = decltype(ie);
- countdownflowmany flowmany{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_flow_receiver(flowmany));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1 flow_many with 1'000 values pull 1", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_many{counter};
- using IE = decltype(ie);
- meter.measure([&]{
- counter.store(1'000);
- ie | op::for_each(mi::make_receiver());
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1 flow_many with 1'000 values pull 1'000", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_many{counter};
- using IE = decltype(ie);
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_flow_receiver(mi::ignoreNF{}, mi::abortEF{}, mi::ignoreDF{}, [](auto up){
- mi::set_value(up, 1'000);
- }));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("inline 1'000 flow_many ignore cancellation", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto ie = inline_executor_flow_many_ignore{};
- using IE = decltype(ie);
- countdownflowmany flowmany{counter};
- meter.measure([&]{
- counter.store(1'000);
- ie | op::submit(mi::make_flow_receiver(flowmany));
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("trampoline 1'000 single get (blocking_submit)", [](nonius::chronometer meter){
- int counter{0};
- auto tr = mi::trampoline();
- using TR = decltype(tr);
- meter.measure([&]{
- counter = 1'000;
- while (--counter >=0) {
- auto fortyTwo = tr | op::transform([](auto){return 42;}) | op::get<int>;
- }
- return counter;
- });
- })
- NONIUS_BENCHMARK("trampoline static derecursion 1'000", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto tr = mi::trampoline();
- using TR = decltype(tr);
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- tr | op::submit(single);
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("trampoline virtual derecursion 1'000", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto tr = mi::trampoline();
- using TR = decltype(tr);
- auto single = countdownsingle{counter};
- std::function<void(mi::any_executor_ref<>)> recurse{[&](auto exec){::pushmi::set_value(single, exec);}};
- meter.measure([&]{
- counter.store(1'000);
- tr | op::submit([&](auto exec) { recurse(exec); });
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("trampoline flow_many_sender 1'000", [](nonius::chronometer meter){
- std::atomic<int> counter{0};
- auto tr = mi::trampoline();
- using TR = decltype(tr);
- std::vector<int> values(1'000);
- std::iota(values.begin(), values.end(), 1);
- auto f = op::flow_from(values, tr) | op::tap([&](int){
- --counter;
- });
- meter.measure([&]{
- counter.store(1'000);
- f | op::for_each(mi::make_receiver());
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("pool{1} submit 1'000", [](nonius::chronometer meter){
- mi::pool pl{std::max(1u,std::thread::hardware_concurrency())};
- auto pe = pl.executor();
- using PE = decltype(pe);
- std::atomic<int> counter{0};
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- pe | op::submit(single);
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("pool{hardware_concurrency} submit 1'000", [](nonius::chronometer meter){
- mi::pool pl{std::min(1u,std::thread::hardware_concurrency())};
- auto pe = pl.executor();
- using PE = decltype(pe);
- std::atomic<int> counter{0};
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- pe | op::submit(single);
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("new thread submit 1'000", [](nonius::chronometer meter){
- auto nt = mi::new_thread();
- using NT = decltype(nt);
- std::atomic<int> counter{0};
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- nt | op::submit(single);
- while(counter.load() > 0);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("new thread blocking_submit 1'000", [](nonius::chronometer meter){
- auto nt = mi::new_thread();
- using NT = decltype(nt);
- std::atomic<int> counter{0};
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- nt | op::blocking_submit(single);
- return counter.load();
- });
- })
- NONIUS_BENCHMARK("new thread + time submit 1'000", [](nonius::chronometer meter){
- auto nt = mi::new_thread();
- using NT = decltype(nt);
- auto time = mi::time_source<>{};
- auto tnt = time.make(mi::systemNowF{}, [nt](){ return nt; })();
- using TNT = decltype(tnt);
- std::atomic<int> counter{0};
- countdownsingle single{counter};
- meter.measure([&]{
- counter.store(1'000);
- tnt | op::submit(single);
- while(counter.load() > 0);
- return counter.load();
- });
- time.join();
- })
|