PushmiBenchmarks.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. /*
  2. * Copyright 2018-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <vector>
  17. #include <folly/experimental/pushmi/o/defer.h>
  18. #include <folly/experimental/pushmi/o/for_each.h>
  19. #include <folly/experimental/pushmi/o/from.h>
  20. #include <folly/experimental/pushmi/o/just.h>
  21. #include <folly/experimental/pushmi/o/on.h>
  22. #include <folly/experimental/pushmi/o/submit.h>
  23. #include <folly/experimental/pushmi/o/tap.h>
  24. #include <folly/experimental/pushmi/o/transform.h>
  25. #include <folly/experimental/pushmi/o/via.h>
  26. #include <folly/experimental/pushmi/new_thread.h>
  27. #include <folly/experimental/pushmi/time_source.h>
  28. #include <folly/experimental/pushmi/trampoline.h>
  29. #include <folly/experimental/pushmi/entangle.h>
  30. #include <folly/experimental/pushmi/receiver.h>
  31. #include <folly/experimental/pushmi/pool.h>
  32. using namespace pushmi::aliases;
  33. template <class R>
  34. struct countdown {
  35. explicit countdown(std::atomic<int>& c) : counter(&c) {}
  36. using properties = mi::properties_t<decltype(R{}())>;
  37. std::atomic<int>* counter;
  38. template <class ExecutorRef>
  39. void value(ExecutorRef exec);
  40. template <class E>
  41. void error(E e) {
  42. std::abort();
  43. }
  44. void done() {}
  45. PUSHMI_TEMPLATE(class Up)
  46. (requires mi::Invocable<
  47. decltype(mi::set_value),
  48. Up,
  49. std::ptrdiff_t>)void starting(Up up) {
  50. mi::set_value(up, 1);
  51. }
  52. PUSHMI_TEMPLATE(class Up)
  53. (requires mi::True<>&& mi::
  54. Invocable<decltype(mi::set_value), Up>)void starting(Up up) volatile {
  55. mi::set_value(up);
  56. }
  57. };
  58. template <class R>
  59. template <class ExecutorRef>
  60. void countdown<R>::value(ExecutorRef exec) {
  61. if (--*counter >= 0) {
  62. exec | op::submit(R{}(*this));
  63. }
  64. }
  65. using countdownsingle = countdown<decltype(mi::make_receiver)>;
  66. using countdownflowsingle = countdown<decltype(mi::make_flow_receiver)>;
  67. using countdownmany = countdown<decltype(mi::make_receiver)>;
  68. using countdownflowmany = countdown<decltype(mi::make_flow_receiver)>;
  69. struct inline_time_executor {
  70. using properties = mi::property_set<
  71. mi::is_time<>,
  72. mi::is_executor<>,
  73. mi::is_fifo_sequence<>,
  74. mi::is_always_blocking<>,
  75. mi::is_single<>>;
  76. std::chrono::system_clock::time_point top() {
  77. return std::chrono::system_clock::now();
  78. }
  79. auto executor() {
  80. return *this;
  81. }
  82. template <class Out>
  83. void submit(std::chrono::system_clock::time_point at, Out out) {
  84. std::this_thread::sleep_until(at);
  85. ::mi::set_value(out, *this);
  86. }
  87. };
  88. struct inline_executor {
  89. using properties = mi::property_set<
  90. mi::is_sender<>,
  91. mi::is_fifo_sequence<>,
  92. mi::is_always_blocking<>,
  93. mi::is_single<>>;
  94. auto executor() {
  95. return inline_time_executor{};
  96. }
  97. template <class Out>
  98. void submit(Out out) {
  99. ::mi::set_value(out, *this);
  100. }
  101. };
  102. template <class CancellationFactory>
  103. struct inline_executor_flow_single {
  104. CancellationFactory cf;
  105. using properties = mi::property_set<
  106. mi::is_sender<>,
  107. mi::is_flow<>,
  108. mi::is_fifo_sequence<>,
  109. mi::is_maybe_blocking<>,
  110. mi::is_single<>>;
  111. auto executor() {
  112. return inline_time_executor{};
  113. }
  114. template <class Out>
  115. void submit(Out out) {
  116. auto tokens = cf();
  117. using Stopper = decltype(tokens.second);
  118. struct Data : mi::receiver<> {
  119. explicit Data(Stopper stopper) : stopper(std::move(stopper)) {}
  120. Stopper stopper;
  121. };
  122. auto up = mi::MAKE(receiver)(
  123. Data{std::move(tokens.second)},
  124. [](auto& data) {},
  125. [](auto& data, auto e) noexcept {
  126. auto both = lock_both(data.stopper);
  127. (*(both.first))(both.second);
  128. },
  129. [](auto& data) {
  130. auto both = lock_both(data.stopper);
  131. (*(both.first))(both.second);
  132. });
  133. // pass reference for cancellation.
  134. ::mi::set_starting(out, std::move(up));
  135. auto both = lock_both(tokens.first);
  136. if (!!both.first && !*(both.first)) {
  137. ::mi::set_value(out, *this);
  138. } else {
  139. // cancellation is not an error
  140. ::mi::set_done(out);
  141. }
  142. }
  143. };
  144. struct shared_cancellation_factory {
  145. auto operator()() {
  146. // boolean cancellation
  147. bool stop = false;
  148. auto set_stop = [](auto& stop) {
  149. if (!!stop) {
  150. *stop = true;
  151. }
  152. };
  153. return mi::shared_entangle(stop, set_stop);
  154. }
  155. };
  156. using inline_executor_flow_single_shared =
  157. inline_executor_flow_single<shared_cancellation_factory>;
  158. struct entangled_cancellation_factory {
  159. auto operator()() {
  160. // boolean cancellation
  161. bool stop = false;
  162. auto set_stop = [](auto& stop) {
  163. if (!!stop) {
  164. *stop = true;
  165. }
  166. };
  167. return mi::entangle(stop, set_stop);
  168. }
  169. };
  170. using inline_executor_flow_single_entangled =
  171. inline_executor_flow_single<entangled_cancellation_factory>;
  172. struct inline_executor_flow_single_ignore {
  173. using properties = mi::property_set<
  174. mi::is_sender<>,
  175. mi::is_flow<>,
  176. mi::is_fifo_sequence<>,
  177. mi::is_maybe_blocking<>,
  178. mi::is_single<>>;
  179. auto executor() {
  180. return inline_time_executor{};
  181. }
  182. template <class Out>
  183. void submit(Out out) {
  184. // pass reference for cancellation.
  185. ::mi::set_starting(out, mi::receiver<>{});
  186. ::mi::set_value(out, *this);
  187. }
  188. };
  189. struct inline_executor_flow_many {
  190. inline_executor_flow_many() : counter(nullptr) {}
  191. inline_executor_flow_many(std::atomic<int>& c) : counter(&c) {}
  192. std::atomic<int>* counter;
  193. using properties = mi::property_set<
  194. mi::is_sender<>,
  195. mi::is_flow<>,
  196. mi::is_fifo_sequence<>,
  197. mi::is_maybe_blocking<>,
  198. mi::is_many<>>;
  199. auto executor() {
  200. return inline_time_executor{};
  201. }
  202. template <class Out>
  203. void submit(Out out) {
  204. // boolean cancellation
  205. struct producer {
  206. producer(Out out, bool s) : out(std::move(out)), stop(s) {}
  207. Out out;
  208. std::atomic<bool> stop;
  209. };
  210. auto p = std::make_shared<producer>(std::move(out), false);
  211. struct Data : mi::receiver<> {
  212. explicit Data(std::shared_ptr<producer> p) : p(std::move(p)) {}
  213. std::shared_ptr<producer> p;
  214. };
  215. auto up = mi::MAKE(receiver)(
  216. Data{p},
  217. [counter = this->counter](auto& data, auto requested) {
  218. if (requested < 1) {
  219. return;
  220. }
  221. // this is re-entrant
  222. while (!data.p->stop && --requested >= 0 &&
  223. (!counter || --*counter >= 0)) {
  224. ::mi::set_value(
  225. data.p->out,
  226. !!counter ? inline_executor_flow_many{*counter}
  227. : inline_executor_flow_many{});
  228. }
  229. if (!counter || *counter == 0) {
  230. ::mi::set_done(data.p->out);
  231. }
  232. },
  233. [](auto& data, auto e) noexcept {
  234. data.p->stop.store(true);
  235. ::mi::set_done(data.p->out);
  236. },
  237. [](auto& data) {
  238. data.p->stop.store(true);
  239. ::mi::set_done(data.p->out);
  240. });
  241. // pass reference for cancellation.
  242. ::mi::set_starting(p->out, std::move(up));
  243. }
  244. };
  245. struct inline_executor_flow_many_ignore {
  246. using properties = mi::property_set<
  247. mi::is_sender<>,
  248. mi::is_flow<>,
  249. mi::is_fifo_sequence<>,
  250. mi::is_always_blocking<>,
  251. mi::is_many<>>;
  252. auto executor() {
  253. return inline_time_executor{};
  254. }
  255. template <class Out>
  256. void submit(Out out) {
  257. // pass reference for cancellation.
  258. ::mi::set_starting(out, mi::receiver<>{});
  259. ::mi::set_value(out, *this);
  260. ::mi::set_done(out);
  261. }
  262. };
  263. struct inline_executor_many {
  264. using properties = mi::property_set<
  265. mi::is_sender<>,
  266. mi::is_fifo_sequence<>,
  267. mi::is_always_blocking<>,
  268. mi::is_many<>>;
  269. auto executor() {
  270. return inline_time_executor{};
  271. }
  272. template <class Out>
  273. void submit(Out out) {
  274. ::mi::set_value(out, *this);
  275. ::mi::set_done(out);
  276. }
  277. };
  278. #define concept Concept
  279. #include <nonius/nonius.h++>
  280. NONIUS_BENCHMARK("ready 1'000 single get (submit)", [](nonius::chronometer meter){
  281. int counter{0};
  282. meter.measure([&]{
  283. counter = 1'000;
  284. while (--counter >=0) {
  285. auto fortyTwo = op::just(42) | op::get<int>;
  286. }
  287. return counter;
  288. });
  289. })
  290. NONIUS_BENCHMARK("ready 1'000 single get (blocking_submit)", [](nonius::chronometer meter){
  291. int counter{0};
  292. meter.measure([&]{
  293. counter = 1'000;
  294. while (--counter >=0) {
  295. auto fortyTwo = mi::make_single_sender([](auto out){ mi::set_value(out, 42); mi::set_done(out);}) | op::get<int>;
  296. }
  297. return counter;
  298. });
  299. })
  300. NONIUS_BENCHMARK("inline 1'000 single", [](nonius::chronometer meter){
  301. std::atomic<int> counter{0};
  302. auto ie = inline_executor{};
  303. using IE = decltype(ie);
  304. countdownsingle single{counter};
  305. meter.measure([&]{
  306. counter.store(1'000);
  307. ie | op::submit(mi::make_receiver(single));
  308. while(counter.load() > 0);
  309. return counter.load();
  310. });
  311. })
  312. NONIUS_BENCHMARK("inline 1'000 time single", [](nonius::chronometer meter){
  313. std::atomic<int> counter{0};
  314. auto ie = inline_time_executor{};
  315. using IE = decltype(ie);
  316. countdownsingle single{counter};
  317. meter.measure([&]{
  318. counter.store(1'000);
  319. ie | op::submit(mi::make_receiver(single));
  320. while(counter.load() > 0);
  321. return counter.load();
  322. });
  323. })
  324. NONIUS_BENCHMARK("inline 1'000 many", [](nonius::chronometer meter){
  325. std::atomic<int> counter{0};
  326. auto ie = inline_executor_many{};
  327. using IE = decltype(ie);
  328. countdownmany many{counter};
  329. meter.measure([&]{
  330. counter.store(1'000);
  331. ie | op::submit(mi::make_receiver(many));
  332. while(counter.load() > 0);
  333. return counter.load();
  334. });
  335. })
  336. NONIUS_BENCHMARK("inline 1'000 flow_single shared", [](nonius::chronometer meter){
  337. std::atomic<int> counter{0};
  338. auto ie = inline_executor_flow_single_shared{};
  339. using IE = decltype(ie);
  340. countdownflowsingle flowsingle{counter};
  341. meter.measure([&]{
  342. counter.store(1'000);
  343. ie | op::submit(mi::make_flow_receiver(flowsingle));
  344. while(counter.load() > 0);
  345. return counter.load();
  346. });
  347. })
  348. NONIUS_BENCHMARK("inline 1'000 flow_single entangle", [](nonius::chronometer meter){
  349. std::atomic<int> counter{0};
  350. auto ie = inline_executor_flow_single_entangled{};
  351. using IE = decltype(ie);
  352. countdownflowsingle flowsingle{counter};
  353. meter.measure([&]{
  354. counter.store(1'000);
  355. ie | op::submit(mi::make_flow_receiver(flowsingle));
  356. while(counter.load() > 0);
  357. return counter.load();
  358. });
  359. })
  360. NONIUS_BENCHMARK("inline 1'000 flow_single ignore cancellation", [](nonius::chronometer meter){
  361. std::atomic<int> counter{0};
  362. auto ie = inline_executor_flow_single_ignore{};
  363. using IE = decltype(ie);
  364. countdownflowsingle flowsingle{counter};
  365. meter.measure([&]{
  366. counter.store(1'000);
  367. ie | op::submit(mi::make_flow_receiver(flowsingle));
  368. while(counter.load() > 0);
  369. return counter.load();
  370. });
  371. })
  372. NONIUS_BENCHMARK("inline 1'000 flow_many", [](nonius::chronometer meter){
  373. std::atomic<int> counter{0};
  374. auto ie = inline_executor_flow_many{};
  375. using IE = decltype(ie);
  376. countdownflowmany flowmany{counter};
  377. meter.measure([&]{
  378. counter.store(1'000);
  379. ie | op::submit(mi::make_flow_receiver(flowmany));
  380. while(counter.load() > 0);
  381. return counter.load();
  382. });
  383. })
  384. NONIUS_BENCHMARK("inline 1 flow_many with 1'000 values pull 1", [](nonius::chronometer meter){
  385. std::atomic<int> counter{0};
  386. auto ie = inline_executor_flow_many{counter};
  387. using IE = decltype(ie);
  388. meter.measure([&]{
  389. counter.store(1'000);
  390. ie | op::for_each(mi::make_receiver());
  391. while(counter.load() > 0);
  392. return counter.load();
  393. });
  394. })
  395. NONIUS_BENCHMARK("inline 1 flow_many with 1'000 values pull 1'000", [](nonius::chronometer meter){
  396. std::atomic<int> counter{0};
  397. auto ie = inline_executor_flow_many{counter};
  398. using IE = decltype(ie);
  399. meter.measure([&]{
  400. counter.store(1'000);
  401. ie | op::submit(mi::make_flow_receiver(mi::ignoreNF{}, mi::abortEF{}, mi::ignoreDF{}, [](auto up){
  402. mi::set_value(up, 1'000);
  403. }));
  404. while(counter.load() > 0);
  405. return counter.load();
  406. });
  407. })
  408. NONIUS_BENCHMARK("inline 1'000 flow_many ignore cancellation", [](nonius::chronometer meter){
  409. std::atomic<int> counter{0};
  410. auto ie = inline_executor_flow_many_ignore{};
  411. using IE = decltype(ie);
  412. countdownflowmany flowmany{counter};
  413. meter.measure([&]{
  414. counter.store(1'000);
  415. ie | op::submit(mi::make_flow_receiver(flowmany));
  416. while(counter.load() > 0);
  417. return counter.load();
  418. });
  419. })
  420. NONIUS_BENCHMARK("trampoline 1'000 single get (blocking_submit)", [](nonius::chronometer meter){
  421. int counter{0};
  422. auto tr = mi::trampoline();
  423. using TR = decltype(tr);
  424. meter.measure([&]{
  425. counter = 1'000;
  426. while (--counter >=0) {
  427. auto fortyTwo = tr | op::transform([](auto){return 42;}) | op::get<int>;
  428. }
  429. return counter;
  430. });
  431. })
  432. NONIUS_BENCHMARK("trampoline static derecursion 1'000", [](nonius::chronometer meter){
  433. std::atomic<int> counter{0};
  434. auto tr = mi::trampoline();
  435. using TR = decltype(tr);
  436. countdownsingle single{counter};
  437. meter.measure([&]{
  438. counter.store(1'000);
  439. tr | op::submit(single);
  440. while(counter.load() > 0);
  441. return counter.load();
  442. });
  443. })
  444. NONIUS_BENCHMARK("trampoline virtual derecursion 1'000", [](nonius::chronometer meter){
  445. std::atomic<int> counter{0};
  446. auto tr = mi::trampoline();
  447. using TR = decltype(tr);
  448. auto single = countdownsingle{counter};
  449. std::function<void(mi::any_executor_ref<>)> recurse{[&](auto exec){::pushmi::set_value(single, exec);}};
  450. meter.measure([&]{
  451. counter.store(1'000);
  452. tr | op::submit([&](auto exec) { recurse(exec); });
  453. while(counter.load() > 0);
  454. return counter.load();
  455. });
  456. })
  457. NONIUS_BENCHMARK("trampoline flow_many_sender 1'000", [](nonius::chronometer meter){
  458. std::atomic<int> counter{0};
  459. auto tr = mi::trampoline();
  460. using TR = decltype(tr);
  461. std::vector<int> values(1'000);
  462. std::iota(values.begin(), values.end(), 1);
  463. auto f = op::flow_from(values, tr) | op::tap([&](int){
  464. --counter;
  465. });
  466. meter.measure([&]{
  467. counter.store(1'000);
  468. f | op::for_each(mi::make_receiver());
  469. while(counter.load() > 0);
  470. return counter.load();
  471. });
  472. })
  473. NONIUS_BENCHMARK("pool{1} submit 1'000", [](nonius::chronometer meter){
  474. mi::pool pl{std::max(1u,std::thread::hardware_concurrency())};
  475. auto pe = pl.executor();
  476. using PE = decltype(pe);
  477. std::atomic<int> counter{0};
  478. countdownsingle single{counter};
  479. meter.measure([&]{
  480. counter.store(1'000);
  481. pe | op::submit(single);
  482. while(counter.load() > 0);
  483. return counter.load();
  484. });
  485. })
  486. NONIUS_BENCHMARK("pool{hardware_concurrency} submit 1'000", [](nonius::chronometer meter){
  487. mi::pool pl{std::min(1u,std::thread::hardware_concurrency())};
  488. auto pe = pl.executor();
  489. using PE = decltype(pe);
  490. std::atomic<int> counter{0};
  491. countdownsingle single{counter};
  492. meter.measure([&]{
  493. counter.store(1'000);
  494. pe | op::submit(single);
  495. while(counter.load() > 0);
  496. return counter.load();
  497. });
  498. })
  499. NONIUS_BENCHMARK("new thread submit 1'000", [](nonius::chronometer meter){
  500. auto nt = mi::new_thread();
  501. using NT = decltype(nt);
  502. std::atomic<int> counter{0};
  503. countdownsingle single{counter};
  504. meter.measure([&]{
  505. counter.store(1'000);
  506. nt | op::submit(single);
  507. while(counter.load() > 0);
  508. return counter.load();
  509. });
  510. })
  511. NONIUS_BENCHMARK("new thread blocking_submit 1'000", [](nonius::chronometer meter){
  512. auto nt = mi::new_thread();
  513. using NT = decltype(nt);
  514. std::atomic<int> counter{0};
  515. countdownsingle single{counter};
  516. meter.measure([&]{
  517. counter.store(1'000);
  518. nt | op::blocking_submit(single);
  519. return counter.load();
  520. });
  521. })
  522. NONIUS_BENCHMARK("new thread + time submit 1'000", [](nonius::chronometer meter){
  523. auto nt = mi::new_thread();
  524. using NT = decltype(nt);
  525. auto time = mi::time_source<>{};
  526. auto tnt = time.make(mi::systemNowF{}, [nt](){ return nt; })();
  527. using TNT = decltype(tnt);
  528. std::atomic<int> counter{0};
  529. countdownsingle single{counter};
  530. meter.measure([&]{
  531. counter.store(1'000);
  532. tnt | op::submit(single);
  533. while(counter.load() > 0);
  534. return counter.load();
  535. });
  536. time.join();
  537. })