/* * Copyright 2018-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include using namespace pushmi::aliases; template > auto naive_executor_bulk_target(Executor e, Allocator a = Allocator{}) { return [e, a]( auto init, auto selector, auto input, auto&& func, auto sb, auto se, auto out) { using RS = decltype(selector); using F = std::conditional_t< std::is_lvalue_reference::value, decltype(func), typename std::remove_reference::type>; using Out = decltype(out); try { typename std::allocator_traits::template rebind_alloc allocState(a); auto shared_state = std::allocate_shared, // accumulation std::atomic, // pending std::atomic // exception count (protects assignment to // first exception) >>( allocState, std::exception_ptr{}, std::move(out), std::move(selector), (decltype(func)&&)func, init(std::move(input)), 1, 0); e | op::submit([e, sb, se, shared_state](auto) { auto stepDone = [](auto shared_state) { // pending if (--std::get<5>(*shared_state) == 0) { // first exception if (std::get<0>(*shared_state)) { mi::set_error( std::get<1>(*shared_state), std::get<0>(*shared_state)); return; } try { // selector(accumulation) auto result = std::get<2>(*shared_state)( std::move(std::get<4>(*shared_state).load())); mi::set_value(std::get<1>(*shared_state), std::move(result)); } catch (...) { mi::set_error( std::get<1>(*shared_state), std::current_exception()); } } }; for (decltype(sb) idx{sb}; idx != se; ++idx, ++std::get<5>(*shared_state)) { e | op::submit([shared_state, idx, stepDone](auto ex) { try { // this indicates to me that bulk is not the right abstraction auto old = std::get<4>(*shared_state).load(); auto step = old; do { step = old; // func(accumulation, idx) std::get<3> (*shared_state)(step, idx); } while (!std::get<4>(*shared_state) .compare_exchange_strong(old, step)); } catch (...) { // exception count if (std::get<6>(*shared_state)++ == 0) { // store first exception std::get<0>(*shared_state) = std::current_exception(); } // else eat the exception } stepDone(shared_state); }); } stepDone(shared_state); }); } catch (...) { e | op::submit([out = std::move(out), ep = std::current_exception()]( auto) mutable { mi::set_error(out, ep); }); } }; } int main() { mi::pool p{std::max(1u, std::thread::hardware_concurrency())}; std::vector vec(10); std::fill(vec.begin(), vec.end(), 4); auto fortyTwo = mi::reduce( naive_executor_bulk_target(p.executor()), vec.begin(), vec.end(), 2, std::plus<>{}); assert(std::accumulate(vec.begin(), vec.end(), 2) == fortyTwo); std::cout << "OK" << std::endl; p.wait(); }