Parallel-inl.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef FOLLY_GEN_PARALLEL_H_
  17. #error This file may only be included from folly/gen/ParallelGen.h
  18. #endif
  19. #include <folly/MPMCQueue.h>
  20. #include <folly/ScopeGuard.h>
  21. #include <folly/experimental/EventCount.h>
  22. #include <atomic>
  23. #include <thread>
  24. #include <vector>
  25. namespace folly {
  26. namespace gen {
  27. namespace detail {
  28. template <typename T>
  29. class ClosableMPMCQueue {
  30. MPMCQueue<T> queue_;
  31. std::atomic<size_t> producers_{0};
  32. std::atomic<size_t> consumers_{0};
  33. folly::EventCount wakeProducer_;
  34. folly::EventCount wakeConsumer_;
  35. public:
  36. explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
  37. ~ClosableMPMCQueue() {
  38. CHECK(!producers());
  39. CHECK(!consumers());
  40. }
  41. void openProducer() {
  42. ++producers_;
  43. }
  44. void openConsumer() {
  45. ++consumers_;
  46. }
  47. void closeInputProducer() {
  48. size_t producers = producers_--;
  49. CHECK(producers);
  50. if (producers == 1) { // last producer
  51. wakeConsumer_.notifyAll();
  52. }
  53. }
  54. void closeOutputConsumer() {
  55. size_t consumers = consumers_--;
  56. CHECK(consumers);
  57. if (consumers == 1) { // last consumer
  58. wakeProducer_.notifyAll();
  59. }
  60. }
  61. size_t producers() const {
  62. return producers_.load(std::memory_order_acquire);
  63. }
  64. size_t consumers() const {
  65. return consumers_.load(std::memory_order_acquire);
  66. }
  67. template <typename... Args>
  68. bool writeUnlessFull(Args&&... args) noexcept {
  69. if (queue_.write(std::forward<Args>(args)...)) {
  70. // wake consumers to pick up new value
  71. wakeConsumer_.notify();
  72. return true;
  73. }
  74. return false;
  75. }
  76. template <typename... Args>
  77. bool writeUnlessClosed(Args&&... args) {
  78. // write if there's room
  79. while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
  80. // if write fails, check if there are still consumers listening
  81. auto key = wakeProducer_.prepareWait();
  82. if (!consumers()) {
  83. // no consumers left; bail out
  84. wakeProducer_.cancelWait();
  85. return false;
  86. }
  87. wakeProducer_.wait(key);
  88. }
  89. // wake consumers to pick up new value
  90. wakeConsumer_.notify();
  91. return true;
  92. }
  93. bool readUnlessEmpty(T& out) {
  94. if (queue_.read(out)) {
  95. // wake producers to fill empty space
  96. wakeProducer_.notify();
  97. return true;
  98. }
  99. return false;
  100. }
  101. bool readUnlessClosed(T& out) {
  102. while (!queue_.readIfNotEmpty(out)) {
  103. auto key = wakeConsumer_.prepareWait();
  104. if (!producers()) {
  105. // wake producers to fill empty space
  106. wakeProducer_.notify();
  107. return false;
  108. }
  109. wakeConsumer_.wait(key);
  110. }
  111. // wake writers blocked by full queue
  112. wakeProducer_.notify();
  113. return true;
  114. }
  115. };
  116. template <class Sink>
  117. class Sub : public Operator<Sub<Sink>> {
  118. Sink sink_;
  119. public:
  120. explicit Sub(Sink sink) : sink_(sink) {}
  121. template <
  122. class Value,
  123. class Source,
  124. class Result =
  125. decltype(std::declval<Sink>().compose(std::declval<Source>())),
  126. class Just = SingleCopy<typename std::decay<Result>::type>>
  127. Just compose(const GenImpl<Value, Source>& source) const {
  128. return Just(source | sink_);
  129. }
  130. };
  131. template <class Ops>
  132. class Parallel : public Operator<Parallel<Ops>> {
  133. Ops ops_;
  134. size_t threads_;
  135. public:
  136. Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
  137. template <
  138. class Input,
  139. class Source,
  140. class InputDecayed = typename std::decay<Input>::type,
  141. class Composed =
  142. decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
  143. class Output = typename Composed::ValueType,
  144. class OutputDecayed = typename std::decay<Output>::type>
  145. class Generator : public GenImpl<
  146. OutputDecayed&&,
  147. Generator<
  148. Input,
  149. Source,
  150. InputDecayed,
  151. Composed,
  152. Output,
  153. OutputDecayed>> {
  154. Source source_;
  155. Ops ops_;
  156. size_t threads_;
  157. using InQueue = ClosableMPMCQueue<InputDecayed>;
  158. using OutQueue = ClosableMPMCQueue<OutputDecayed>;
  159. class Puller : public GenImpl<InputDecayed&&, Puller> {
  160. InQueue* queue_;
  161. public:
  162. explicit Puller(InQueue* queue) : queue_(queue) {}
  163. template <class Handler>
  164. bool apply(Handler&& handler) const {
  165. InputDecayed input;
  166. while (queue_->readUnlessClosed(input)) {
  167. if (!handler(std::move(input))) {
  168. return false;
  169. }
  170. }
  171. return true;
  172. }
  173. template <class Body>
  174. void foreach(Body&& body) const {
  175. InputDecayed input;
  176. while (queue_->readUnlessClosed(input)) {
  177. body(std::move(input));
  178. }
  179. }
  180. };
  181. template <bool all = false>
  182. class Pusher : public Operator<Pusher<all>> {
  183. OutQueue* queue_;
  184. public:
  185. explicit Pusher(OutQueue* queue) : queue_(queue) {}
  186. template <class Value, class InnerSource>
  187. void compose(const GenImpl<Value, InnerSource>& source) const {
  188. if (all) {
  189. source.self().foreach([&](Value value) {
  190. queue_->writeUnlessClosed(std::forward<Value>(value));
  191. });
  192. } else {
  193. source.self().apply([&](Value value) {
  194. return queue_->writeUnlessClosed(std::forward<Value>(value));
  195. });
  196. }
  197. }
  198. };
  199. template <bool all = false>
  200. class Executor {
  201. InQueue inQueue_;
  202. OutQueue outQueue_;
  203. Puller puller_;
  204. Pusher<all> pusher_;
  205. std::vector<std::thread> workers_;
  206. const Ops* ops_;
  207. void work() {
  208. puller_ | *ops_ | pusher_;
  209. }
  210. public:
  211. Executor(size_t threads, const Ops* ops)
  212. : inQueue_(threads * 4),
  213. outQueue_(threads * 4),
  214. puller_(&inQueue_),
  215. pusher_(&outQueue_),
  216. ops_(ops) {
  217. inQueue_.openProducer();
  218. outQueue_.openConsumer();
  219. for (size_t t = 0; t < threads; ++t) {
  220. inQueue_.openConsumer();
  221. outQueue_.openProducer();
  222. workers_.emplace_back([this] {
  223. SCOPE_EXIT {
  224. inQueue_.closeOutputConsumer();
  225. outQueue_.closeInputProducer();
  226. };
  227. this->work();
  228. });
  229. }
  230. }
  231. ~Executor() {
  232. if (inQueue_.producers()) {
  233. inQueue_.closeInputProducer();
  234. }
  235. if (outQueue_.consumers()) {
  236. outQueue_.closeOutputConsumer();
  237. }
  238. while (!workers_.empty()) {
  239. workers_.back().join();
  240. workers_.pop_back();
  241. }
  242. CHECK(!inQueue_.consumers());
  243. CHECK(!outQueue_.producers());
  244. }
  245. void closeInputProducer() {
  246. inQueue_.closeInputProducer();
  247. }
  248. void closeOutputConsumer() {
  249. outQueue_.closeOutputConsumer();
  250. }
  251. bool writeUnlessClosed(Input&& input) {
  252. return inQueue_.writeUnlessClosed(std::forward<Input>(input));
  253. }
  254. bool writeUnlessFull(Input&& input) {
  255. return inQueue_.writeUnlessFull(std::forward<Input>(input));
  256. }
  257. bool readUnlessClosed(OutputDecayed& output) {
  258. return outQueue_.readUnlessClosed(output);
  259. }
  260. bool readUnlessEmpty(OutputDecayed& output) {
  261. return outQueue_.readUnlessEmpty(output);
  262. }
  263. };
  264. public:
  265. Generator(Source source, Ops ops, size_t threads)
  266. : source_(std::move(source)),
  267. ops_(std::move(ops)),
  268. threads_(
  269. threads
  270. ? threads
  271. : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
  272. template <class Handler>
  273. bool apply(Handler&& handler) const {
  274. Executor<false> executor(threads_, &ops_);
  275. bool more = true;
  276. source_.apply([&](Input input) {
  277. if (executor.writeUnlessFull(std::forward<Input>(input))) {
  278. return true;
  279. }
  280. OutputDecayed output;
  281. while (executor.readUnlessEmpty(output)) {
  282. if (!handler(std::move(output))) {
  283. more = false;
  284. return false;
  285. }
  286. }
  287. if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
  288. return false;
  289. }
  290. return true;
  291. });
  292. executor.closeInputProducer();
  293. if (more) {
  294. OutputDecayed output;
  295. while (executor.readUnlessClosed(output)) {
  296. if (!handler(std::move(output))) {
  297. more = false;
  298. break;
  299. }
  300. }
  301. }
  302. executor.closeOutputConsumer();
  303. return more;
  304. }
  305. template <class Body>
  306. void foreach(Body&& body) const {
  307. Executor<true> executor(threads_, &ops_);
  308. source_.foreach([&](Input input) {
  309. if (executor.writeUnlessFull(std::forward<Input>(input))) {
  310. return;
  311. }
  312. OutputDecayed output;
  313. while (executor.readUnlessEmpty(output)) {
  314. body(std::move(output));
  315. }
  316. CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
  317. });
  318. executor.closeInputProducer();
  319. OutputDecayed output;
  320. while (executor.readUnlessClosed(output)) {
  321. body(std::move(output));
  322. }
  323. executor.closeOutputConsumer();
  324. }
  325. };
  326. template <class Value, class Source>
  327. Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
  328. return Generator<Value, Source>(source.self(), ops_, threads_);
  329. }
  330. template <class Value, class Source>
  331. Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
  332. return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
  333. }
  334. };
  335. /**
  336. * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
  337. * maximum chunk size.
  338. *
  339. * Usually used through the 'chunked' helper, like:
  340. *
  341. * int n
  342. * = chunked(values)
  343. * | parallel // each thread processes a chunk
  344. * | concat // but can still process values one at a time
  345. * | filter(isPrime)
  346. * | atomic_count;
  347. */
  348. template <class Iterator>
  349. class ChunkedRangeSource
  350. : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
  351. int chunkSize_;
  352. Range<Iterator> range_;
  353. public:
  354. ChunkedRangeSource() = default;
  355. ChunkedRangeSource(int chunkSize, Range<Iterator> range)
  356. : chunkSize_(chunkSize), range_(std::move(range)) {}
  357. template <class Handler>
  358. bool apply(Handler&& handler) const {
  359. auto remaining = range_;
  360. while (!remaining.empty()) {
  361. auto chunk = remaining.subpiece(0, chunkSize_);
  362. remaining.advance(chunk.size());
  363. auto gen = RangeSource<Iterator>(chunk);
  364. if (!handler(std::move(gen))) {
  365. return false;
  366. }
  367. }
  368. return true;
  369. }
  370. };
  371. } // namespace detail
  372. } // namespace gen
  373. } // namespace folly