ParallelMap-inl.h 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef FOLLY_GEN_PARALLELMAP_H_
  17. #error This file may only be included from folly/gen/ParallelMap.h
  18. #endif
  19. #include <atomic>
  20. #include <cassert>
  21. #include <thread>
  22. #include <type_traits>
  23. #include <utility>
  24. #include <vector>
  25. #include <folly/MPMCPipeline.h>
  26. #include <folly/experimental/EventCount.h>
  27. #include <folly/functional/Invoke.h>
  28. namespace folly {
  29. namespace gen {
  30. namespace detail {
  31. /**
  32. * PMap - Map in parallel (using threads). For producing a sequence of
  33. * values by passing each value from a source collection through a
  34. * predicate while running the predicate in parallel in different
  35. * threads.
  36. *
  37. * This type is usually used through the 'pmap' helper function:
  38. *
  39. * auto squares = seq(1, 10) | pmap(fibonacci, 4) | sum;
  40. */
  41. template <class Predicate>
  42. class PMap : public Operator<PMap<Predicate>> {
  43. Predicate pred_;
  44. size_t nThreads_;
  45. public:
  46. PMap() = default;
  47. PMap(Predicate pred, size_t nThreads)
  48. : pred_(std::move(pred)), nThreads_(nThreads) {}
  49. template <
  50. class Value,
  51. class Source,
  52. class Input = typename std::decay<Value>::type,
  53. class Output =
  54. typename std::decay<invoke_result_t<Predicate, Value>>::type>
  55. class Generator
  56. : public GenImpl<Output, Generator<Value, Source, Input, Output>> {
  57. Source source_;
  58. Predicate pred_;
  59. const size_t nThreads_;
  60. class ExecutionPipeline {
  61. std::vector<std::thread> workers_;
  62. std::atomic<bool> done_{false};
  63. const Predicate& pred_;
  64. MPMCPipeline<Input, Output> pipeline_;
  65. EventCount wake_;
  66. public:
  67. ExecutionPipeline(const Predicate& pred, size_t nThreads)
  68. : pred_(pred), pipeline_(nThreads, nThreads) {
  69. workers_.reserve(nThreads);
  70. for (size_t i = 0; i < nThreads; i++) {
  71. workers_.push_back(std::thread([this] { this->predApplier(); }));
  72. }
  73. }
  74. ~ExecutionPipeline() {
  75. assert(pipeline_.sizeGuess() == 0);
  76. assert(done_.load());
  77. for (auto& w : workers_) {
  78. w.join();
  79. }
  80. }
  81. void stop() {
  82. // prevent workers from consuming more than we produce.
  83. done_.store(true, std::memory_order_release);
  84. wake_.notifyAll();
  85. }
  86. bool write(Value&& value) {
  87. bool wrote = pipeline_.write(std::forward<Value>(value));
  88. if (wrote) {
  89. wake_.notify();
  90. }
  91. return wrote;
  92. }
  93. void blockingWrite(Value&& value) {
  94. pipeline_.blockingWrite(std::forward<Value>(value));
  95. wake_.notify();
  96. }
  97. bool read(Output& out) {
  98. return pipeline_.read(out);
  99. }
  100. void blockingRead(Output& out) {
  101. pipeline_.blockingRead(out);
  102. }
  103. private:
  104. void predApplier() {
  105. // Each thread takes a value from the pipeline_, runs the
  106. // predicate and enqueues the result. The pipeline preserves
  107. // ordering. NOTE: don't use blockingReadStage<0> to read from
  108. // the pipeline_ as there may not be any: end-of-data is signaled
  109. // separately using done_/wake_.
  110. Input in;
  111. for (;;) {
  112. auto key = wake_.prepareWait();
  113. typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
  114. if (pipeline_.template readStage<0>(ticket, in)) {
  115. wake_.cancelWait();
  116. Output out = pred_(std::move(in));
  117. pipeline_.template blockingWriteStage<0>(ticket, std::move(out));
  118. continue;
  119. }
  120. if (done_.load(std::memory_order_acquire)) {
  121. wake_.cancelWait();
  122. break;
  123. }
  124. // Not done_, but no items in the queue.
  125. wake_.wait(key);
  126. }
  127. }
  128. };
  129. public:
  130. Generator(Source source, const Predicate& pred, size_t nThreads)
  131. : source_(std::move(source)),
  132. pred_(pred),
  133. nThreads_(nThreads ? nThreads : sysconf(_SC_NPROCESSORS_ONLN)) {}
  134. template <class Body>
  135. void foreach(Body&& body) const {
  136. ExecutionPipeline pipeline(pred_, nThreads_);
  137. size_t wrote = 0;
  138. size_t read = 0;
  139. source_.foreach([&](Value value) {
  140. if (pipeline.write(std::forward<Value>(value))) {
  141. // input queue not yet full, saturate it before we process
  142. // anything downstream
  143. ++wrote;
  144. return;
  145. }
  146. // input queue full; drain ready items from the queue
  147. Output out;
  148. while (pipeline.read(out)) {
  149. ++read;
  150. body(std::move(out));
  151. }
  152. // write the value we were going to write before we made room.
  153. pipeline.blockingWrite(std::forward<Value>(value));
  154. ++wrote;
  155. });
  156. pipeline.stop();
  157. // flush the output queue
  158. while (read < wrote) {
  159. Output out;
  160. pipeline.blockingRead(out);
  161. ++read;
  162. body(std::move(out));
  163. }
  164. }
  165. template <class Handler>
  166. bool apply(Handler&& handler) const {
  167. ExecutionPipeline pipeline(pred_, nThreads_);
  168. size_t wrote = 0;
  169. size_t read = 0;
  170. bool more = true;
  171. source_.apply([&](Value value) {
  172. if (pipeline.write(std::forward<Value>(value))) {
  173. // input queue not yet full, saturate it before we process
  174. // anything downstream
  175. ++wrote;
  176. return true;
  177. }
  178. // input queue full; drain ready items from the queue
  179. Output out;
  180. while (pipeline.read(out)) {
  181. ++read;
  182. if (!handler(std::move(out))) {
  183. more = false;
  184. return false;
  185. }
  186. }
  187. // write the value we were going to write before we made room.
  188. pipeline.blockingWrite(std::forward<Value>(value));
  189. ++wrote;
  190. return true;
  191. });
  192. pipeline.stop();
  193. // flush the output queue
  194. while (read < wrote) {
  195. Output out;
  196. pipeline.blockingRead(out);
  197. ++read;
  198. if (more) {
  199. more = more && handler(std::move(out));
  200. }
  201. }
  202. return more;
  203. }
  204. static constexpr bool infinite = Source::infinite;
  205. };
  206. template <class Source, class Value, class Gen = Generator<Value, Source>>
  207. Gen compose(GenImpl<Value, Source>&& source) const {
  208. return Gen(std::move(source.self()), pred_, nThreads_);
  209. }
  210. template <class Source, class Value, class Gen = Generator<Value, Source>>
  211. Gen compose(const GenImpl<Value, Source>& source) const {
  212. return Gen(source.self(), pred_, nThreads_);
  213. }
  214. };
  215. } // namespace detail
  216. } // namespace gen
  217. } // namespace folly