123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- /*
- * Copyright 2018-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <algorithm>
- #include <cassert>
- #include <exception>
- #include <iostream>
- #include <numeric>
- #include <vector>
- #include <folly/experimental/pushmi/examples/pool.h>
- #include <folly/experimental/pushmi/examples/reduce.h>
- using namespace pushmi::aliases;
- template <class Executor, class Allocator = std::allocator<char>>
- auto naive_executor_bulk_target(Executor e, Allocator a = Allocator{}) {
- return [e, a](
- auto init,
- auto selector,
- auto input,
- auto&& func,
- auto sb,
- auto se,
- auto out) {
- using RS = decltype(selector);
- using F = std::conditional_t<
- std::is_lvalue_reference<decltype(func)>::value,
- decltype(func),
- typename std::remove_reference<decltype(func)>::type>;
- using Out = decltype(out);
- try {
- typename std::allocator_traits<Allocator>::template rebind_alloc<char>
- allocState(a);
- auto shared_state = std::allocate_shared<std::tuple<
- std::exception_ptr, // first exception
- Out, // destination
- RS, // selector
- F, // func
- std::atomic<decltype(init(input))>, // accumulation
- std::atomic<std::size_t>, // pending
- std::atomic<std::size_t> // exception count (protects assignment to
- // first exception)
- >>(
- allocState,
- std::exception_ptr{},
- std::move(out),
- std::move(selector),
- (decltype(func)&&)func,
- init(std::move(input)),
- 1,
- 0);
- e | op::submit([e, sb, se, shared_state](auto) {
- auto stepDone = [](auto shared_state) {
- // pending
- if (--std::get<5>(*shared_state) == 0) {
- // first exception
- if (std::get<0>(*shared_state)) {
- mi::set_error(
- std::get<1>(*shared_state), std::get<0>(*shared_state));
- return;
- }
- try {
- // selector(accumulation)
- auto result = std::get<2>(*shared_state)(
- std::move(std::get<4>(*shared_state).load()));
- mi::set_value(std::get<1>(*shared_state), std::move(result));
- } catch (...) {
- mi::set_error(
- std::get<1>(*shared_state), std::current_exception());
- }
- }
- };
- for (decltype(sb) idx{sb}; idx != se;
- ++idx, ++std::get<5>(*shared_state)) {
- e | op::submit([shared_state, idx, stepDone](auto ex) {
- try {
- // this indicates to me that bulk is not the right abstraction
- auto old = std::get<4>(*shared_state).load();
- auto step = old;
- do {
- step = old;
- // func(accumulation, idx)
- std::get<3> (*shared_state)(step, idx);
- } while (!std::get<4>(*shared_state)
- .compare_exchange_strong(old, step));
- } catch (...) {
- // exception count
- if (std::get<6>(*shared_state)++ == 0) {
- // store first exception
- std::get<0>(*shared_state) = std::current_exception();
- } // else eat the exception
- }
- stepDone(shared_state);
- });
- }
- stepDone(shared_state);
- });
- } catch (...) {
- e |
- op::submit([out = std::move(out), ep = std::current_exception()](
- auto) mutable { mi::set_error(out, ep); });
- }
- };
- }
- int main() {
- mi::pool p{std::max(1u, std::thread::hardware_concurrency())};
- std::vector<int> vec(10);
- std::fill(vec.begin(), vec.end(), 4);
- auto fortyTwo = mi::reduce(
- naive_executor_bulk_target(p.executor()),
- vec.begin(),
- vec.end(),
- 2,
- std::plus<>{});
- assert(std::accumulate(vec.begin(), vec.end(), 2) == fortyTwo);
- std::cout << "OK" << std::endl;
- p.wait();
- }
|