FutureDAG.h 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /*
  2. * Copyright 2015-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <folly/futures/Future.h>
  18. #include <folly/futures/SharedPromise.h>
  19. namespace folly {
  20. class FutureDAG : public std::enable_shared_from_this<FutureDAG> {
  21. public:
  22. static std::shared_ptr<FutureDAG> create() {
  23. return std::shared_ptr<FutureDAG>(new FutureDAG());
  24. }
  25. typedef size_t Handle;
  26. typedef std::function<Future<Unit>()> FutureFunc;
  27. Handle add(FutureFunc func, Executor* executor = nullptr) {
  28. nodes.emplace_back(std::move(func), executor);
  29. return nodes.size() - 1;
  30. }
  31. void remove(Handle a) {
  32. if (a >= nodes.size()) {
  33. return;
  34. }
  35. if (nodes[a].hasDependents) {
  36. for (auto& node : nodes) {
  37. auto& deps = node.dependencies;
  38. deps.erase(
  39. std::remove(std::begin(deps), std::end(deps), a), std::end(deps));
  40. for (Handle& handle : deps) {
  41. if (handle > a) {
  42. handle--;
  43. }
  44. }
  45. }
  46. }
  47. nodes.erase(nodes.begin() + a);
  48. }
  49. void reset() {
  50. // Delete all but source node, and reset dependency properties
  51. Handle source_node;
  52. std::unordered_set<Handle> memo;
  53. for (auto& node : nodes) {
  54. for (Handle handle : node.dependencies) {
  55. memo.insert(handle);
  56. }
  57. }
  58. for (Handle handle = 0; handle < nodes.size(); handle++) {
  59. if (memo.find(handle) == memo.end()) {
  60. source_node = handle;
  61. }
  62. }
  63. nodes.erase(nodes.begin(), nodes.begin() + source_node);
  64. nodes.erase(nodes.begin() + 1, nodes.end());
  65. nodes[0].hasDependents = false;
  66. nodes[0].dependencies.clear();
  67. }
  68. void dependency(Handle a, Handle b) {
  69. nodes[b].dependencies.push_back(a);
  70. nodes[a].hasDependents = true;
  71. }
  72. void clean_state(Handle source, Handle sink) {
  73. for (auto handle : nodes[sink].dependencies) {
  74. nodes[handle].hasDependents = false;
  75. }
  76. nodes[0].hasDependents = false;
  77. remove(source);
  78. remove(sink);
  79. }
  80. Future<Unit> go() {
  81. if (hasCycle()) {
  82. return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
  83. }
  84. std::vector<Handle> rootNodes;
  85. std::vector<Handle> leafNodes;
  86. for (Handle handle = 0; handle < nodes.size(); handle++) {
  87. if (nodes[handle].dependencies.empty()) {
  88. rootNodes.push_back(handle);
  89. }
  90. if (!nodes[handle].hasDependents) {
  91. leafNodes.push_back(handle);
  92. }
  93. }
  94. auto sinkHandle = add([] { return Future<Unit>(); });
  95. for (auto handle : leafNodes) {
  96. dependency(handle, sinkHandle);
  97. }
  98. auto sourceHandle = add(nullptr);
  99. for (auto handle : rootNodes) {
  100. dependency(sourceHandle, handle);
  101. }
  102. for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
  103. std::vector<Future<Unit>> dependencies;
  104. for (auto depHandle : nodes[handle].dependencies) {
  105. dependencies.push_back(nodes[depHandle].promise.getFuture());
  106. }
  107. collect(dependencies)
  108. .via(nodes[handle].executor)
  109. .thenValue([this, handle](std::vector<Unit>&&) {
  110. nodes[handle].func().then([this, handle](Try<Unit>&& t) {
  111. nodes[handle].promise.setTry(std::move(t));
  112. });
  113. })
  114. .onError([this, handle](exception_wrapper ew) {
  115. nodes[handle].promise.setException(std::move(ew));
  116. });
  117. }
  118. nodes[sourceHandle].promise.setValue();
  119. return nodes[sinkHandle].promise.getFuture().thenValue(
  120. [that = shared_from_this(), sourceHandle, sinkHandle](Unit) {
  121. that->clean_state(sourceHandle, sinkHandle);
  122. });
  123. }
  124. private:
  125. FutureDAG() = default;
  126. bool hasCycle() {
  127. // Perform a modified topological sort to detect cycles
  128. std::vector<std::vector<Handle>> dependencies;
  129. for (auto& node : nodes) {
  130. dependencies.push_back(node.dependencies);
  131. }
  132. std::vector<size_t> dependents(nodes.size());
  133. for (auto& dependencyEdges : dependencies) {
  134. for (auto handle : dependencyEdges) {
  135. dependents[handle]++;
  136. }
  137. }
  138. std::vector<Handle> handles;
  139. for (Handle handle = 0; handle < nodes.size(); handle++) {
  140. if (!nodes[handle].hasDependents) {
  141. handles.push_back(handle);
  142. }
  143. }
  144. while (!handles.empty()) {
  145. auto handle = handles.back();
  146. handles.pop_back();
  147. while (!dependencies[handle].empty()) {
  148. auto dependency = dependencies[handle].back();
  149. dependencies[handle].pop_back();
  150. if (--dependents[dependency] == 0) {
  151. handles.push_back(dependency);
  152. }
  153. }
  154. }
  155. for (auto& dependencyEdges : dependencies) {
  156. if (!dependencyEdges.empty()) {
  157. return true;
  158. }
  159. }
  160. return false;
  161. }
  162. struct Node {
  163. Node(FutureFunc&& funcArg, Executor* executorArg)
  164. : func(std::move(funcArg)), executor(executorArg) {}
  165. FutureFunc func{nullptr};
  166. Executor* executor{nullptr};
  167. SharedPromise<Unit> promise;
  168. std::vector<Handle> dependencies;
  169. bool hasDependents{false};
  170. bool visited{false};
  171. };
  172. std::vector<Node> nodes;
  173. };
  174. // Polymorphic functor implementation
  175. template <typename T>
  176. class FutureDAGFunctor {
  177. public:
  178. std::shared_ptr<FutureDAG> dag = FutureDAG::create();
  179. T state;
  180. std::vector<T> dep_states;
  181. T result() {
  182. return state;
  183. }
  184. // execReset() runs DAG & clears all nodes except for source
  185. void execReset() {
  186. this->dag->go().get();
  187. this->dag->reset();
  188. }
  189. void exec() {
  190. this->dag->go().get();
  191. }
  192. virtual void operator()() {}
  193. explicit FutureDAGFunctor(T init_val) : state(init_val) {}
  194. FutureDAGFunctor() : state() {}
  195. virtual ~FutureDAGFunctor() {}
  196. };
  197. } // namespace folly