pool.h 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #pragma once
  2. /*
  3. * Copyright 2018-present Facebook, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #include <experimental/thread_pool> // @manual
  18. #include <folly/experimental/pushmi/executor.h>
  19. #include <folly/experimental/pushmi/time_single_sender.h>
  20. #include <folly/experimental/pushmi/trampoline.h>
  21. #if __cpp_deduction_guides >= 201703
  22. #define MAKE(x) x MAKE_
  23. #define MAKE_(...) \
  24. { __VA_ARGS__ }
  25. #else
  26. #define MAKE(x) make_##x
  27. #endif
  28. namespace pushmi {
  29. using std::experimental::static_thread_pool;
  30. namespace execution = std::experimental::execution;
  31. template <class Executor>
  32. struct pool_executor {
  33. using properties = property_set<
  34. is_sender<>,
  35. is_executor<>,
  36. is_never_blocking<>,
  37. is_concurrent_sequence<>,
  38. is_single<>>;
  39. using e_t = Executor;
  40. e_t e;
  41. explicit pool_executor(e_t e) : e(std::move(e)) {}
  42. auto executor() {
  43. return *this;
  44. }
  45. PUSHMI_TEMPLATE(class Out)
  46. (requires Receiver<Out>)void submit(Out out) const {
  47. e.execute([e = *this, out = std::move(out)]() mutable {
  48. ::pushmi::set_value(out, e);
  49. });
  50. }
  51. };
  52. class pool {
  53. static_thread_pool p;
  54. public:
  55. inline explicit pool(std::size_t threads) : p(threads) {}
  56. inline auto executor() {
  57. auto exec = execution::require(
  58. p.executor(), execution::never_blocking, execution::oneway);
  59. return pool_executor<decltype(exec)>{exec};
  60. }
  61. inline void stop() {
  62. p.stop();
  63. }
  64. inline void wait() {
  65. p.wait();
  66. }
  67. };
  68. } // namespace pushmi