Future-inl.h 71 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <algorithm>
  18. #include <cassert>
  19. #include <chrono>
  20. #include <thread>
  21. #include <folly/Optional.h>
  22. #include <folly/executors/InlineExecutor.h>
  23. #include <folly/executors/QueuedImmediateExecutor.h>
  24. #include <folly/futures/detail/Core.h>
  25. #include <folly/synchronization/Baton.h>
  26. #ifndef FOLLY_FUTURE_USING_FIBER
  27. #if FOLLY_MOBILE || defined(__APPLE__)
  28. #define FOLLY_FUTURE_USING_FIBER 0
  29. #else
  30. #define FOLLY_FUTURE_USING_FIBER 1
  31. #include <folly/fibers/Baton.h>
  32. #endif
  33. #endif
  34. namespace folly {
  35. class Timekeeper;
  36. namespace futures {
  37. namespace detail {
  38. #if FOLLY_FUTURE_USING_FIBER
  39. typedef folly::fibers::Baton FutureBatonType;
  40. #else
  41. typedef folly::Baton<> FutureBatonType;
  42. #endif
  43. } // namespace detail
  44. } // namespace futures
  45. namespace detail {
  46. std::shared_ptr<Timekeeper> getTimekeeperSingleton();
  47. } // namespace detail
  48. namespace futures {
  49. namespace detail {
  50. // Guarantees that the stored functor is destructed before the stored promise
  51. // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
  52. template <typename T, typename F>
  53. class CoreCallbackState {
  54. using DF = _t<std::decay<F>>;
  55. public:
  56. CoreCallbackState(Promise<T>&& promise, F&& func) noexcept(
  57. noexcept(DF(std::declval<F&&>())))
  58. : func_(std::forward<F>(func)), promise_(std::move(promise)) {
  59. assert(before_barrier());
  60. }
  61. CoreCallbackState(CoreCallbackState&& that) noexcept(
  62. noexcept(DF(std::declval<F&&>()))) {
  63. if (that.before_barrier()) {
  64. new (&func_) DF(std::forward<F>(that.func_));
  65. promise_ = that.stealPromise();
  66. }
  67. }
  68. CoreCallbackState& operator=(CoreCallbackState&&) = delete;
  69. ~CoreCallbackState() {
  70. if (before_barrier()) {
  71. stealPromise();
  72. }
  73. }
  74. template <typename... Args>
  75. auto invoke(Args&&... args) noexcept(
  76. noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
  77. assert(before_barrier());
  78. return std::forward<F>(func_)(std::forward<Args>(args)...);
  79. }
  80. template <typename... Args>
  81. auto tryInvoke(Args&&... args) noexcept {
  82. return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
  83. }
  84. void setTry(Try<T>&& t) {
  85. stealPromise().setTry(std::move(t));
  86. }
  87. void setException(exception_wrapper&& ew) {
  88. stealPromise().setException(std::move(ew));
  89. }
  90. Promise<T> stealPromise() noexcept {
  91. assert(before_barrier());
  92. func_.~DF();
  93. return std::move(promise_);
  94. }
  95. private:
  96. bool before_barrier() const noexcept {
  97. return !promise_.isFulfilled();
  98. }
  99. union {
  100. DF func_;
  101. };
  102. Promise<T> promise_{Promise<T>::makeEmpty()};
  103. };
  104. template <typename T, typename F>
  105. auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
  106. noexcept(CoreCallbackState<T, F>(
  107. std::declval<Promise<T>&&>(),
  108. std::declval<F&&>()))) {
  109. return CoreCallbackState<T, F>(std::move(p), std::forward<F>(f));
  110. }
  111. template <typename T, typename R, typename... Args>
  112. auto makeCoreCallbackState(Promise<T>&& p, R (&f)(Args...)) noexcept {
  113. return CoreCallbackState<T, R (*)(Args...)>(std::move(p), &f);
  114. }
  115. template <class T>
  116. FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
  117. other.core_ = nullptr;
  118. }
  119. template <class T>
  120. FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
  121. other.core_ = nullptr;
  122. }
  123. template <class T>
  124. template <class T2, typename>
  125. FutureBase<T>::FutureBase(T2&& val)
  126. : core_(Core::make(Try<T>(std::forward<T2>(val)))) {}
  127. template <class T>
  128. template <typename T2>
  129. FutureBase<T>::FutureBase(
  130. typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
  131. : core_(Core::make(Try<T>(T()))) {}
  132. template <class T>
  133. template <
  134. class... Args,
  135. typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
  136. type>
  137. FutureBase<T>::FutureBase(in_place_t, Args&&... args)
  138. : core_(Core::make(in_place, std::forward<Args>(args)...)) {}
  139. template <class T>
  140. void FutureBase<T>::assign(FutureBase<T>&& other) noexcept {
  141. detach();
  142. core_ = exchange(other.core_, nullptr);
  143. }
  144. template <class T>
  145. FutureBase<T>::~FutureBase() {
  146. detach();
  147. }
  148. template <class T>
  149. T& FutureBase<T>::value() & {
  150. return result().value();
  151. }
  152. template <class T>
  153. T const& FutureBase<T>::value() const& {
  154. return result().value();
  155. }
  156. template <class T>
  157. T&& FutureBase<T>::value() && {
  158. return std::move(result().value());
  159. }
  160. template <class T>
  161. T const&& FutureBase<T>::value() const&& {
  162. return std::move(result().value());
  163. }
  164. template <class T>
  165. Try<T>& FutureBase<T>::result() & {
  166. return getCoreTryChecked();
  167. }
  168. template <class T>
  169. Try<T> const& FutureBase<T>::result() const& {
  170. return getCoreTryChecked();
  171. }
  172. template <class T>
  173. Try<T>&& FutureBase<T>::result() && {
  174. return std::move(getCoreTryChecked());
  175. }
  176. template <class T>
  177. Try<T> const&& FutureBase<T>::result() const&& {
  178. return std::move(getCoreTryChecked());
  179. }
  180. template <class T>
  181. bool FutureBase<T>::isReady() const {
  182. return getCore().hasResult();
  183. }
  184. template <class T>
  185. bool FutureBase<T>::hasValue() const {
  186. return result().hasValue();
  187. }
  188. template <class T>
  189. bool FutureBase<T>::hasException() const {
  190. return result().hasException();
  191. }
  192. template <class T>
  193. void FutureBase<T>::detach() {
  194. if (core_) {
  195. core_->detachFuture();
  196. core_ = nullptr;
  197. }
  198. }
  199. template <class T>
  200. void FutureBase<T>::throwIfInvalid() const {
  201. if (!core_) {
  202. throw_exception<FutureInvalid>();
  203. }
  204. }
  205. template <class T>
  206. void FutureBase<T>::throwIfContinued() const {
  207. if (!core_ || core_->hasCallback()) {
  208. throw_exception<FutureAlreadyContinued>();
  209. }
  210. }
  211. template <class T>
  212. Optional<Try<T>> FutureBase<T>::poll() {
  213. auto& core = getCore();
  214. return core.hasResult() ? Optional<Try<T>>(std::move(core.getTry()))
  215. : Optional<Try<T>>();
  216. }
  217. template <class T>
  218. void FutureBase<T>::raise(exception_wrapper exception) {
  219. getCore().raise(std::move(exception));
  220. }
  221. template <class T>
  222. template <class F>
  223. void FutureBase<T>::setCallback_(F&& func) {
  224. setCallback_(std::forward<F>(func), RequestContext::saveContext());
  225. }
  226. template <class T>
  227. template <class F>
  228. void FutureBase<T>::setCallback_(
  229. F&& func,
  230. std::shared_ptr<folly::RequestContext> context) {
  231. throwIfContinued();
  232. getCore().setCallback(std::forward<F>(func), std::move(context));
  233. }
  234. template <class T>
  235. FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
  236. : core_(nullptr) {}
  237. // MSVC 2017 Update 7 released with a bug that causes issues expanding to an
  238. // empty parameter pack when invoking a templated member function. It should
  239. // be fixed for MSVC 2017 Update 8.
  240. // TODO: Remove.
  241. namespace detail_msvc_15_7_workaround {
  242. template <typename R, std::size_t S>
  243. using IfArgsSizeIs = std::enable_if_t<R::Arg::ArgsSize::value == S, int>;
  244. template <typename R, typename State, typename T, IfArgsSizeIs<R, 0> = 0>
  245. decltype(auto) invoke(R, State& state, Try<T>& /* t */) {
  246. return state.invoke();
  247. }
  248. template <typename R, typename State, typename T, IfArgsSizeIs<R, 1> = 0>
  249. decltype(auto) invoke(R, State& state, Try<T>& t) {
  250. using Arg0 = typename R::Arg::ArgList::FirstArg;
  251. return state.invoke(t.template get<R::Arg::isTry(), Arg0>());
  252. }
  253. template <typename R, typename State, typename T, IfArgsSizeIs<R, 0> = 0>
  254. decltype(auto) tryInvoke(R, State& state, Try<T>& /* t */) {
  255. return state.tryInvoke();
  256. }
  257. template <typename R, typename State, typename T, IfArgsSizeIs<R, 1> = 0>
  258. decltype(auto) tryInvoke(R, State& state, Try<T>& t) {
  259. using Arg0 = typename R::Arg::ArgList::FirstArg;
  260. return state.tryInvoke(t.template get<R::Arg::isTry(), Arg0>());
  261. }
  262. } // namespace detail_msvc_15_7_workaround
  263. // then
  264. // Variant: returns a value
  265. // e.g. f.then([](Try<T>&& t){ return t.value(); });
  266. template <class T>
  267. template <typename F, typename R>
  268. typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
  269. FutureBase<T>::thenImplementation(F&& func, R) {
  270. static_assert(
  271. R::Arg::ArgsSize::value <= 1, "Then must take zero/one argument");
  272. typedef typename R::ReturnsFuture::Inner B;
  273. Promise<B> p;
  274. p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler());
  275. // grab the Future now before we lose our handle on the Promise
  276. auto sf = p.getSemiFuture();
  277. sf.setExecutor(this->getExecutor());
  278. auto f = Future<B>(sf.core_);
  279. sf.core_ = nullptr;
  280. /* This is a bit tricky.
  281. We can't just close over *this in case this Future gets moved. So we
  282. make a new dummy Future. We could figure out something more
  283. sophisticated that avoids making a new Future object when it can, as an
  284. optimization. But this is correct.
  285. core_ can't be moved, it is explicitly disallowed (as is copying). But
  286. if there's ever a reason to allow it, this is one place that makes that
  287. assumption and would need to be fixed. We use a standard shared pointer
  288. for core_ (by copying it in), which means in essence obj holds a shared
  289. pointer to itself. But this shouldn't leak because Promise will not
  290. outlive the continuation, because Promise will setException() with a
  291. broken Promise if it is destructed before completed. We could use a
  292. weak pointer but it would have to be converted to a shared pointer when
  293. func is executed (because the Future returned by func may possibly
  294. persist beyond the callback, if it gets moved), and so it is an
  295. optimization to just make it shared from the get-go.
  296. Two subtle but important points about this design. futures::detail::Core
  297. has no back pointers to Future or Promise, so if Future or Promise get
  298. moved (and they will be moved in performant code) we don't have to do
  299. anything fancy. And because we store the continuation in the
  300. futures::detail::Core, not in the Future, we can execute the continuation
  301. even after the Future has gone out of scope. This is an intentional design
  302. decision. It is likely we will want to be able to cancel a continuation
  303. in some circumstances, but I think it should be explicit not implicit
  304. in the destruction of the Future used to create it.
  305. */
  306. this->setCallback_(
  307. [state = futures::detail::makeCoreCallbackState(
  308. std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
  309. if (!R::Arg::isTry() && t.hasException()) {
  310. state.setException(std::move(t.exception()));
  311. } else {
  312. state.setTry(makeTryWith([&] {
  313. return detail_msvc_15_7_workaround::invoke(R{}, state, t);
  314. }));
  315. }
  316. });
  317. return f;
  318. }
  319. // Pass through a simple future as it needs no deferral adaptation
  320. template <class T>
  321. Future<T> chainExecutor(Executor*, Future<T>&& f) {
  322. return std::move(f);
  323. }
  324. // Correctly chain a SemiFuture for deferral
  325. template <class T>
  326. Future<T> chainExecutor(Executor* e, SemiFuture<T>&& f) {
  327. if (!e) {
  328. e = &InlineExecutor::instance();
  329. }
  330. return std::move(f).via(e);
  331. }
  332. // Variant: returns a Future
  333. // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
  334. template <class T>
  335. template <typename F, typename R>
  336. typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
  337. FutureBase<T>::thenImplementation(F&& func, R) {
  338. static_assert(
  339. R::Arg::ArgsSize::value <= 1, "Then must take zero/one argument");
  340. typedef typename R::ReturnsFuture::Inner B;
  341. Promise<B> p;
  342. p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler());
  343. // grab the Future now before we lose our handle on the Promise
  344. auto sf = p.getSemiFuture();
  345. auto* e = this->getExecutor();
  346. sf.setExecutor(e);
  347. auto f = Future<B>(sf.core_);
  348. sf.core_ = nullptr;
  349. this->setCallback_([state = futures::detail::makeCoreCallbackState(
  350. std::move(p), std::forward<F>(func))](
  351. Try<T>&& t) mutable {
  352. if (!R::Arg::isTry() && t.hasException()) {
  353. state.setException(std::move(t.exception()));
  354. } else {
  355. // Ensure that if function returned a SemiFuture we correctly chain
  356. // potential deferral.
  357. auto tf2 = detail_msvc_15_7_workaround::tryInvoke(R{}, state, t);
  358. if (tf2.hasException()) {
  359. state.setException(std::move(tf2.exception()));
  360. } else {
  361. auto statePromise = state.stealPromise();
  362. auto tf3 =
  363. chainExecutor(statePromise.core_->getExecutor(), *std::move(tf2));
  364. if (std::is_same<T, B>::value && statePromise.getCore().hasCallback()) {
  365. tf3.core_->setExecutor(statePromise.core_->getExecutor());
  366. auto callbackAndContext = statePromise.getCore().stealCallback();
  367. tf3.setCallback_(
  368. std::move(callbackAndContext.first),
  369. std::move(callbackAndContext.second));
  370. } else {
  371. tf3.setCallback_([p2 = std::move(statePromise)](Try<B>&& b) mutable {
  372. p2.setTry(std::move(b));
  373. });
  374. }
  375. }
  376. }
  377. });
  378. return f;
  379. }
  380. template <class T>
  381. template <typename E>
  382. SemiFuture<T>
  383. FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) && {
  384. struct Context {
  385. explicit Context(E ex) : exception(std::move(ex)) {}
  386. E exception;
  387. Future<Unit> thisFuture;
  388. Promise<T> promise;
  389. std::atomic<bool> token{false};
  390. };
  391. std::shared_ptr<Timekeeper> tks;
  392. if (LIKELY(!tk)) {
  393. tks = folly::detail::getTimekeeperSingleton();
  394. tk = tks.get();
  395. }
  396. if (UNLIKELY(!tk)) {
  397. return makeSemiFuture<T>(FutureNoTimekeeper());
  398. }
  399. auto ctx = std::make_shared<Context>(std::move(e));
  400. auto f = [ctx](Try<T>&& t) {
  401. if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
  402. ctx->promise.setTry(std::move(t));
  403. }
  404. };
  405. using R = futures::detail::callableResult<T, decltype(f)>;
  406. ctx->thisFuture = this->thenImplementation(std::move(f), R{});
  407. // Properly propagate interrupt values through futures chained after within()
  408. ctx->promise.setInterruptHandler(
  409. [weakCtx = to_weak_ptr(ctx)](const exception_wrapper& ex) {
  410. if (auto lockedCtx = weakCtx.lock()) {
  411. lockedCtx->thisFuture.raise(ex);
  412. }
  413. });
  414. // Have time keeper use a weak ptr to hold ctx,
  415. // so that ctx can be deallocated as soon as the future job finished.
  416. tk->after(dur).thenTry([weakCtx = to_weak_ptr(ctx)](Try<Unit>&& t) mutable {
  417. auto lockedCtx = weakCtx.lock();
  418. if (!lockedCtx) {
  419. // ctx already released. "this" completed first, cancel "after"
  420. return;
  421. }
  422. // "after" completed first, cancel "this"
  423. lockedCtx->thisFuture.raise(FutureTimeout());
  424. if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) {
  425. if (t.hasException()) {
  426. lockedCtx->promise.setException(std::move(t.exception()));
  427. } else {
  428. lockedCtx->promise.setException(std::move(lockedCtx->exception));
  429. }
  430. }
  431. });
  432. return ctx->promise.getSemiFuture();
  433. }
  434. /**
  435. * Defer work until executor is actively boosted.
  436. *
  437. * NOTE: that this executor is a private implementation detail belonging to the
  438. * Folly Futures library and not intended to be used elsewhere. It is designed
  439. * specifically for the use case of deferring work on a SemiFuture. It is NOT
  440. * thread safe. Please do not use for any other purpose without great care.
  441. */
  442. class DeferredExecutor final : public Executor {
  443. public:
  444. void add(Func func) override {
  445. auto state = state_.load(std::memory_order_acquire);
  446. if (state == State::DETACHED) {
  447. return;
  448. }
  449. if (state == State::HAS_EXECUTOR) {
  450. executor_->add(std::move(func));
  451. return;
  452. }
  453. DCHECK(state == State::EMPTY);
  454. func_ = std::move(func);
  455. if (state_.compare_exchange_strong(
  456. state,
  457. State::HAS_FUNCTION,
  458. std::memory_order_release,
  459. std::memory_order_acquire)) {
  460. return;
  461. }
  462. DCHECK(state == State::DETACHED || state == State::HAS_EXECUTOR);
  463. if (state == State::DETACHED) {
  464. std::exchange(func_, nullptr);
  465. return;
  466. }
  467. executor_->add(std::exchange(func_, nullptr));
  468. }
  469. void setExecutor(folly::Executor::KeepAlive<> executor) {
  470. if (nestedExecutors_) {
  471. auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
  472. for (auto& nestedExecutor : *nestedExecutors) {
  473. nestedExecutor->setExecutor(executor.copy());
  474. }
  475. }
  476. executor_ = std::move(executor);
  477. auto state = state_.load(std::memory_order_acquire);
  478. if (state == State::EMPTY &&
  479. state_.compare_exchange_strong(
  480. state,
  481. State::HAS_EXECUTOR,
  482. std::memory_order_release,
  483. std::memory_order_acquire)) {
  484. return;
  485. }
  486. DCHECK(state == State::HAS_FUNCTION);
  487. state_.store(State::HAS_EXECUTOR, std::memory_order_release);
  488. executor_->add(std::exchange(func_, nullptr));
  489. }
  490. void detach() {
  491. if (nestedExecutors_) {
  492. auto nestedExecutors = std::exchange(nestedExecutors_, nullptr);
  493. for (auto& nestedExecutor : *nestedExecutors) {
  494. nestedExecutor->detach();
  495. }
  496. }
  497. auto state = state_.load(std::memory_order_acquire);
  498. if (state == State::EMPTY &&
  499. state_.compare_exchange_strong(
  500. state,
  501. State::DETACHED,
  502. std::memory_order_release,
  503. std::memory_order_acquire)) {
  504. return;
  505. }
  506. DCHECK(state == State::HAS_FUNCTION);
  507. state_.store(State::DETACHED, std::memory_order_release);
  508. std::exchange(func_, nullptr);
  509. }
  510. void setNestedExecutors(
  511. std::vector<folly::Executor::KeepAlive<DeferredExecutor>> executors) {
  512. DCHECK(!nestedExecutors_);
  513. nestedExecutors_ = std::make_unique<
  514. std::vector<folly::Executor::KeepAlive<DeferredExecutor>>>(
  515. std::move(executors));
  516. }
  517. static KeepAlive<DeferredExecutor> create() {
  518. return makeKeepAlive<DeferredExecutor>(new DeferredExecutor());
  519. }
  520. private:
  521. DeferredExecutor() {}
  522. bool keepAliveAcquire() override {
  523. auto keepAliveCount =
  524. keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
  525. DCHECK(keepAliveCount > 0);
  526. return true;
  527. }
  528. void keepAliveRelease() override {
  529. auto keepAliveCount =
  530. keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
  531. DCHECK(keepAliveCount > 0);
  532. if (keepAliveCount == 1) {
  533. delete this;
  534. }
  535. }
  536. enum class State { EMPTY, HAS_FUNCTION, HAS_EXECUTOR, DETACHED };
  537. std::atomic<State> state_{State::EMPTY};
  538. Func func_;
  539. folly::Executor::KeepAlive<> executor_;
  540. std::unique_ptr<std::vector<folly::Executor::KeepAlive<DeferredExecutor>>>
  541. nestedExecutors_;
  542. std::atomic<ssize_t> keepAliveCount_{1};
  543. };
  544. class WaitExecutor final : public folly::Executor {
  545. public:
  546. void add(Func func) override {
  547. auto wQueue = queue_.wlock();
  548. if (wQueue->detached) {
  549. return;
  550. }
  551. bool empty = wQueue->funcs.empty();
  552. wQueue->funcs.push_back(std::move(func));
  553. if (empty) {
  554. baton_.post();
  555. }
  556. }
  557. void drive() {
  558. baton_.wait();
  559. baton_.reset();
  560. auto funcs = std::move(queue_.wlock()->funcs);
  561. for (auto& func : funcs) {
  562. std::exchange(func, nullptr)();
  563. }
  564. }
  565. using Clock = std::chrono::steady_clock;
  566. bool driveUntil(Clock::time_point deadline) {
  567. if (!baton_.try_wait_until(deadline)) {
  568. return false;
  569. }
  570. baton_.reset();
  571. auto funcs = std::move(queue_.wlock()->funcs);
  572. for (auto& func : funcs) {
  573. std::exchange(func, nullptr)();
  574. }
  575. return true;
  576. }
  577. void detach() {
  578. // Make sure we don't hold the lock while destroying funcs.
  579. [&] {
  580. auto wQueue = queue_.wlock();
  581. wQueue->detached = true;
  582. return std::move(wQueue->funcs);
  583. }();
  584. }
  585. static KeepAlive<WaitExecutor> create() {
  586. return makeKeepAlive<WaitExecutor>(new WaitExecutor());
  587. }
  588. private:
  589. WaitExecutor() {}
  590. bool keepAliveAcquire() override {
  591. auto keepAliveCount =
  592. keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
  593. DCHECK(keepAliveCount > 0);
  594. return true;
  595. }
  596. void keepAliveRelease() override {
  597. auto keepAliveCount =
  598. keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
  599. DCHECK(keepAliveCount > 0);
  600. if (keepAliveCount == 1) {
  601. delete this;
  602. }
  603. }
  604. struct Queue {
  605. std::vector<Func> funcs;
  606. bool detached{false};
  607. };
  608. folly::Synchronized<Queue> queue_;
  609. FutureBatonType baton_;
  610. std::atomic<ssize_t> keepAliveCount_{1};
  611. };
  612. // Vector-like structure to play with window,
  613. // which otherwise expects a vector of size `times`,
  614. // which would be expensive with large `times` sizes.
  615. struct WindowFakeVector {
  616. using iterator = std::vector<size_t>::iterator;
  617. WindowFakeVector(size_t size) : size_(size) {}
  618. size_t operator[](const size_t index) const {
  619. return index;
  620. }
  621. size_t size() const {
  622. return size_;
  623. }
  624. private:
  625. size_t size_;
  626. };
  627. } // namespace detail
  628. } // namespace futures
  629. template <class T>
  630. SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
  631. return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
  632. }
  633. // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
  634. template <class F>
  635. typename std::enable_if<
  636. isFutureOrSemiFuture<invoke_result_t<F>>::value,
  637. SemiFuture<typename invoke_result_t<F>::value_type>>::type
  638. makeSemiFutureWith(F&& func) {
  639. using InnerType = typename isFutureOrSemiFuture<invoke_result_t<F>>::Inner;
  640. try {
  641. return std::forward<F>(func)();
  642. } catch (std::exception& e) {
  643. return makeSemiFuture<InnerType>(
  644. exception_wrapper(std::current_exception(), e));
  645. } catch (...) {
  646. return makeSemiFuture<InnerType>(
  647. exception_wrapper(std::current_exception()));
  648. }
  649. }
  650. // makeSemiFutureWith(T()) -> SemiFuture<T>
  651. // makeSemiFutureWith(void()) -> SemiFuture<Unit>
  652. template <class F>
  653. typename std::enable_if<
  654. !(isFutureOrSemiFuture<invoke_result_t<F>>::value),
  655. SemiFuture<lift_unit_t<invoke_result_t<F>>>>::type
  656. makeSemiFutureWith(F&& func) {
  657. using LiftedResult = lift_unit_t<invoke_result_t<F>>;
  658. return makeSemiFuture<LiftedResult>(
  659. makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
  660. }
  661. template <class T>
  662. SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
  663. return makeSemiFuture(Try<T>(e));
  664. }
  665. template <class T>
  666. SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
  667. return makeSemiFuture(Try<T>(std::move(ew)));
  668. }
  669. template <class T, class E>
  670. typename std::
  671. enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
  672. makeSemiFuture(E const& e) {
  673. return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
  674. }
  675. template <class T>
  676. SemiFuture<T> makeSemiFuture(Try<T> t) {
  677. return SemiFuture<T>(SemiFuture<T>::Core::make(std::move(t)));
  678. }
  679. // This must be defined after the constructors to avoid a bug in MSVC
  680. // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
  681. inline SemiFuture<Unit> makeSemiFuture() {
  682. return makeSemiFuture(Unit{});
  683. }
  684. template <class T>
  685. SemiFuture<T> SemiFuture<T>::makeEmpty() {
  686. return SemiFuture<T>(futures::detail::EmptyConstruct{});
  687. }
  688. template <class T>
  689. typename SemiFuture<T>::DeferredExecutor* SemiFuture<T>::getDeferredExecutor()
  690. const {
  691. if (auto executor = this->getExecutor()) {
  692. assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
  693. return static_cast<DeferredExecutor*>(executor);
  694. }
  695. return nullptr;
  696. }
  697. template <class T>
  698. folly::Executor::KeepAlive<typename SemiFuture<T>::DeferredExecutor>
  699. SemiFuture<T>::stealDeferredExecutor() const {
  700. if (auto executor = this->getExecutor()) {
  701. assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
  702. auto executorKeepAlive =
  703. folly::getKeepAliveToken(static_cast<DeferredExecutor*>(executor));
  704. this->core_->setExecutor(nullptr);
  705. return executorKeepAlive;
  706. }
  707. return {};
  708. }
  709. template <class T>
  710. void SemiFuture<T>::releaseDeferredExecutor(Core* core) {
  711. if (!core) {
  712. return;
  713. }
  714. if (auto executor = core->getExecutor()) {
  715. assert(dynamic_cast<DeferredExecutor*>(executor) != nullptr);
  716. static_cast<DeferredExecutor*>(executor)->detach();
  717. core->setExecutor(nullptr);
  718. }
  719. }
  720. template <class T>
  721. SemiFuture<T>::~SemiFuture() {
  722. releaseDeferredExecutor(this->core_);
  723. }
  724. template <class T>
  725. SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
  726. : futures::detail::FutureBase<T>(std::move(other)) {}
  727. template <class T>
  728. SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
  729. : futures::detail::FutureBase<T>(std::move(other)) {
  730. // SemiFuture should not have an executor on construction
  731. if (this->core_) {
  732. this->setExecutor(nullptr);
  733. }
  734. }
  735. template <class T>
  736. SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
  737. releaseDeferredExecutor(this->core_);
  738. this->assign(std::move(other));
  739. return *this;
  740. }
  741. template <class T>
  742. SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
  743. releaseDeferredExecutor(this->core_);
  744. this->assign(std::move(other));
  745. // SemiFuture should not have an executor on construction
  746. if (this->core_) {
  747. this->setExecutor(nullptr);
  748. }
  749. return *this;
  750. }
  751. template <class T>
  752. Future<T> SemiFuture<T>::via(
  753. Executor::KeepAlive<> executor,
  754. int8_t priority) && {
  755. if (!executor) {
  756. throw_exception<FutureNoExecutor>();
  757. }
  758. if (auto deferredExecutor = getDeferredExecutor()) {
  759. deferredExecutor->setExecutor(executor.copy());
  760. }
  761. auto newFuture = Future<T>(this->core_);
  762. this->core_ = nullptr;
  763. newFuture.setExecutor(std::move(executor), priority);
  764. return newFuture;
  765. }
  766. template <class T>
  767. Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
  768. return std::move(*this).via(getKeepAliveToken(executor), priority);
  769. }
  770. template <class T>
  771. Future<T> SemiFuture<T>::toUnsafeFuture() && {
  772. return std::move(*this).via(&InlineExecutor::instance());
  773. }
  774. template <class T>
  775. template <typename F>
  776. SemiFuture<typename futures::detail::tryCallableResult<T, F>::value_type>
  777. SemiFuture<T>::defer(F&& func) && {
  778. DeferredExecutor* deferredExecutor = getDeferredExecutor();
  779. if (!deferredExecutor) {
  780. auto newDeferredExecutor = DeferredExecutor::create();
  781. deferredExecutor = newDeferredExecutor.get();
  782. this->setExecutor(std::move(newDeferredExecutor));
  783. }
  784. auto sf = Future<T>(this->core_).thenTry(std::forward<F>(func)).semi();
  785. this->core_ = nullptr;
  786. // Carry deferred executor through chain as constructor from Future will
  787. // nullify it
  788. sf.setExecutor(deferredExecutor);
  789. return sf;
  790. }
  791. template <class T>
  792. template <typename F>
  793. SemiFuture<typename futures::detail::valueCallableResult<T, F>::value_type>
  794. SemiFuture<T>::deferValue(F&& func) && {
  795. return std::move(*this).defer([f = std::forward<F>(func)](
  796. folly::Try<T>&& t) mutable {
  797. return std::forward<F>(f)(
  798. t.template get<
  799. false,
  800. typename futures::detail::valueCallableResult<T, F>::FirstArg>());
  801. });
  802. }
  803. template <class T>
  804. template <class ExceptionType, class F>
  805. SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
  806. return std::move(*this).defer(
  807. [func = std::forward<F>(func)](Try<T>&& t) mutable {
  808. if (auto e = t.template tryGetExceptionObject<ExceptionType>()) {
  809. return makeSemiFutureWith(
  810. [&]() mutable { return std::forward<F>(func)(*e); });
  811. } else {
  812. return makeSemiFuture<T>(std::move(t));
  813. }
  814. });
  815. }
  816. template <class T>
  817. template <class F>
  818. SemiFuture<T> SemiFuture<T>::deferError(F&& func) && {
  819. return std::move(*this).defer(
  820. [func = std::forward<F>(func)](Try<T> t) mutable {
  821. if (t.hasException()) {
  822. return makeSemiFutureWith([&]() mutable {
  823. return std::forward<F>(func)(std::move(t.exception()));
  824. });
  825. } else {
  826. return makeSemiFuture<T>(std::move(t));
  827. }
  828. });
  829. }
  830. template <typename T>
  831. SemiFuture<T> SemiFuture<T>::delayed(Duration dur, Timekeeper* tk) && {
  832. return collectAllSemiFuture(*this, futures::sleep(dur, tk))
  833. .toUnsafeFuture()
  834. .thenValue([](std::tuple<Try<T>, Try<Unit>> tup) {
  835. Try<T>& t = std::get<0>(tup);
  836. return makeFuture<T>(std::move(t));
  837. });
  838. }
  839. template <class T>
  840. Future<T> Future<T>::makeEmpty() {
  841. return Future<T>(futures::detail::EmptyConstruct{});
  842. }
  843. template <class T>
  844. Future<T>::Future(Future<T>&& other) noexcept
  845. : futures::detail::FutureBase<T>(std::move(other)) {}
  846. template <class T>
  847. Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
  848. this->assign(std::move(other));
  849. return *this;
  850. }
  851. template <class T>
  852. template <
  853. class T2,
  854. typename std::enable_if<
  855. !std::is_same<T, typename std::decay<T2>::type>::value &&
  856. std::is_constructible<T, T2&&>::value &&
  857. std::is_convertible<T2&&, T>::value,
  858. int>::type>
  859. Future<T>::Future(Future<T2>&& other)
  860. : Future(
  861. std::move(other).thenValue([](T2&& v) { return T(std::move(v)); })) {}
  862. template <class T>
  863. template <
  864. class T2,
  865. typename std::enable_if<
  866. !std::is_same<T, typename std::decay<T2>::type>::value &&
  867. std::is_constructible<T, T2&&>::value &&
  868. !std::is_convertible<T2&&, T>::value,
  869. int>::type>
  870. Future<T>::Future(Future<T2>&& other)
  871. : Future(
  872. std::move(other).thenValue([](T2&& v) { return T(std::move(v)); })) {}
  873. template <class T>
  874. template <
  875. class T2,
  876. typename std::enable_if<
  877. !std::is_same<T, typename std::decay<T2>::type>::value &&
  878. std::is_constructible<T, T2&&>::value,
  879. int>::type>
  880. Future<T>& Future<T>::operator=(Future<T2>&& other) {
  881. return operator=(
  882. std::move(other).thenValue([](T2&& v) { return T(std::move(v)); }));
  883. }
  884. // unwrap
  885. template <class T>
  886. template <class F>
  887. typename std::
  888. enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
  889. Future<T>::unwrap() && {
  890. return std::move(*this).thenValue(
  891. [](Future<typename isFuture<T>::Inner> internal_future) {
  892. return internal_future;
  893. });
  894. }
  895. template <class T>
  896. Future<T> Future<T>::via(Executor::KeepAlive<> executor, int8_t priority) && {
  897. this->setExecutor(std::move(executor), priority);
  898. auto newFuture = Future<T>(this->core_);
  899. this->core_ = nullptr;
  900. return newFuture;
  901. }
  902. template <class T>
  903. Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
  904. return std::move(*this).via(getKeepAliveToken(executor), priority);
  905. }
  906. template <class T>
  907. Future<T> Future<T>::via(Executor::KeepAlive<> executor, int8_t priority) & {
  908. this->throwIfInvalid();
  909. Promise<T> p;
  910. auto sf = p.getSemiFuture();
  911. auto func = [p = std::move(p)](Try<T>&& t) mutable {
  912. p.setTry(std::move(t));
  913. };
  914. using R = futures::detail::callableResult<T, decltype(func)>;
  915. this->thenImplementation(std::move(func), R{});
  916. // Construct future from semifuture manually because this may not have
  917. // an executor set due to legacy code. This means we can bypass the executor
  918. // check in SemiFuture::via
  919. auto f = Future<T>(sf.core_);
  920. sf.core_ = nullptr;
  921. return std::move(f).via(std::move(executor), priority);
  922. }
  923. template <class T>
  924. Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
  925. return via(getKeepAliveToken(executor), priority);
  926. }
  927. template <typename T>
  928. template <typename R, typename Caller, typename... Args>
  929. Future<typename isFuture<R>::Inner> Future<T>::then(
  930. R (Caller::*func)(Args...),
  931. Caller* instance) && {
  932. typedef typename std::remove_cv<typename std::remove_reference<
  933. typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
  934. FirstArg;
  935. return std::move(*this).thenTry([instance, func](Try<T>&& t) {
  936. return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
  937. });
  938. }
  939. template <class T>
  940. template <typename F>
  941. Future<typename futures::detail::tryCallableResult<T, F>::value_type>
  942. Future<T>::thenTry(F&& func) && {
  943. auto lambdaFunc = [f = std::forward<F>(func)](folly::Try<T>&& t) mutable {
  944. return std::forward<F>(f)(std::move(t));
  945. };
  946. using R = futures::detail::tryCallableResult<T, decltype(lambdaFunc)>;
  947. return this->thenImplementation(std::move(lambdaFunc), R{});
  948. }
  949. template <class T>
  950. template <typename F>
  951. Future<typename futures::detail::valueCallableResult<T, F>::value_type>
  952. Future<T>::thenValue(F&& func) && {
  953. auto lambdaFunc = [f = std::forward<F>(func)](folly::Try<T>&& t) mutable {
  954. return std::forward<F>(f)(
  955. t.template get<
  956. false,
  957. typename futures::detail::valueCallableResult<T, F>::FirstArg>());
  958. };
  959. using R = futures::detail::tryCallableResult<T, decltype(lambdaFunc)>;
  960. return this->thenImplementation(std::move(lambdaFunc), R{});
  961. }
  962. template <class T>
  963. template <class ExceptionType, class F>
  964. Future<T> Future<T>::thenError(F&& func) && {
  965. // Forward to onError but ensure that returned future carries the executor
  966. // Allow for applying to future with null executor while this is still
  967. // possible.
  968. auto* e = this->getExecutor();
  969. return std::move(*this)
  970. .onError([func = std::forward<F>(func)](ExceptionType& ex) mutable {
  971. return std::forward<F>(func)(ex);
  972. })
  973. .via(e ? e : &InlineExecutor::instance());
  974. }
  975. template <class T>
  976. template <class F>
  977. Future<T> Future<T>::thenError(F&& func) && {
  978. // Forward to onError but ensure that returned future carries the executor
  979. // Allow for applying to future with null executor while this is still
  980. // possible.
  981. auto* e = this->getExecutor();
  982. return std::move(*this)
  983. .onError([func = std::forward<F>(func)](
  984. folly::exception_wrapper&& ex) mutable {
  985. return std::forward<F>(func)(std::move(ex));
  986. })
  987. .via(e ? e : &InlineExecutor::instance());
  988. }
  989. template <class T>
  990. Future<Unit> Future<T>::then() && {
  991. return std::move(*this).thenValue([](T&&) {});
  992. }
  993. // onError where the callback returns T
  994. template <class T>
  995. template <class F>
  996. typename std::enable_if<
  997. !is_invocable<F, exception_wrapper>::value &&
  998. !futures::detail::Extract<F>::ReturnsFuture::value,
  999. Future<T>>::type
  1000. Future<T>::onError(F&& func) && {
  1001. typedef std::remove_reference_t<
  1002. typename futures::detail::Extract<F>::FirstArg>
  1003. Exn;
  1004. static_assert(
  1005. std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
  1006. "Return type of onError callback must be T or Future<T>");
  1007. Promise<T> p;
  1008. p.core_->setInterruptHandlerNoLock(this->getCore().getInterruptHandler());
  1009. auto sf = p.getSemiFuture();
  1010. this->setCallback_(
  1011. [state = futures::detail::makeCoreCallbackState(
  1012. std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
  1013. if (auto e = t.template tryGetExceptionObject<Exn>()) {
  1014. state.setTry(makeTryWith([&] { return state.invoke(*e); }));
  1015. } else {
  1016. state.setTry(std::move(t));
  1017. }
  1018. });
  1019. // Allow for applying to future with null executor while this is still
  1020. // possible.
  1021. // TODO(T26801487): Should have an executor
  1022. return std::move(sf).via(&InlineExecutor::instance());
  1023. }
  1024. // onError where the callback returns Future<T>
  1025. template <class T>
  1026. template <class F>
  1027. typename std::enable_if<
  1028. !is_invocable<F, exception_wrapper>::value &&
  1029. futures::detail::Extract<F>::ReturnsFuture::value,
  1030. Future<T>>::type
  1031. Future<T>::onError(F&& func) && {
  1032. static_assert(
  1033. std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
  1034. value,
  1035. "Return type of onError callback must be T or Future<T>");
  1036. typedef std::remove_reference_t<
  1037. typename futures::detail::Extract<F>::FirstArg>
  1038. Exn;
  1039. Promise<T> p;
  1040. auto sf = p.getSemiFuture();
  1041. this->setCallback_(
  1042. [state = futures::detail::makeCoreCallbackState(
  1043. std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
  1044. if (auto e = t.template tryGetExceptionObject<Exn>()) {
  1045. auto tf2 = state.tryInvoke(*e);
  1046. if (tf2.hasException()) {
  1047. state.setException(std::move(tf2.exception()));
  1048. } else {
  1049. tf2->setCallback_([p = state.stealPromise()](Try<T>&& t3) mutable {
  1050. p.setTry(std::move(t3));
  1051. });
  1052. }
  1053. } else {
  1054. state.setTry(std::move(t));
  1055. }
  1056. });
  1057. // Allow for applying to future with null executor while this is still
  1058. // possible.
  1059. // TODO(T26801487): Should have an executor
  1060. return std::move(sf).via(&InlineExecutor::instance());
  1061. }
  1062. template <class T>
  1063. template <class F>
  1064. Future<T> Future<T>::ensure(F&& func) && {
  1065. return std::move(*this).then(
  1066. [funcw = std::forward<F>(func)](Try<T>&& t) mutable {
  1067. std::forward<F>(funcw)();
  1068. return makeFuture(std::move(t));
  1069. });
  1070. }
  1071. template <class T>
  1072. template <class F>
  1073. Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) && {
  1074. return std::move(*this).within(dur, tk).template thenError<FutureTimeout>(
  1075. [funcw = std::forward<F>(func)](auto const&) mutable {
  1076. return std::forward<F>(funcw)();
  1077. });
  1078. }
  1079. template <class T>
  1080. template <class F>
  1081. typename std::enable_if<
  1082. is_invocable<F, exception_wrapper>::value &&
  1083. futures::detail::Extract<F>::ReturnsFuture::value,
  1084. Future<T>>::type
  1085. Future<T>::onError(F&& func) && {
  1086. static_assert(
  1087. std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
  1088. value,
  1089. "Return type of onError callback must be T or Future<T>");
  1090. Promise<T> p;
  1091. auto sf = p.getSemiFuture();
  1092. this->setCallback_(
  1093. [state = futures::detail::makeCoreCallbackState(
  1094. std::move(p), std::forward<F>(func))](Try<T> t) mutable {
  1095. if (t.hasException()) {
  1096. auto tf2 = state.tryInvoke(std::move(t.exception()));
  1097. if (tf2.hasException()) {
  1098. state.setException(std::move(tf2.exception()));
  1099. } else {
  1100. tf2->setCallback_([p = state.stealPromise()](Try<T>&& t3) mutable {
  1101. p.setTry(std::move(t3));
  1102. });
  1103. }
  1104. } else {
  1105. state.setTry(std::move(t));
  1106. }
  1107. });
  1108. // Allow for applying to future with null executor while this is still
  1109. // possible.
  1110. // TODO(T26801487): Should have an executor
  1111. return std::move(sf).via(&InlineExecutor::instance());
  1112. }
  1113. // onError(exception_wrapper) that returns T
  1114. template <class T>
  1115. template <class F>
  1116. typename std::enable_if<
  1117. is_invocable<F, exception_wrapper>::value &&
  1118. !futures::detail::Extract<F>::ReturnsFuture::value,
  1119. Future<T>>::type
  1120. Future<T>::onError(F&& func) && {
  1121. static_assert(
  1122. std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
  1123. value,
  1124. "Return type of onError callback must be T or Future<T>");
  1125. Promise<T> p;
  1126. auto sf = p.getSemiFuture();
  1127. this->setCallback_(
  1128. [state = futures::detail::makeCoreCallbackState(
  1129. std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
  1130. if (t.hasException()) {
  1131. state.setTry(makeTryWith(
  1132. [&] { return state.invoke(std::move(t.exception())); }));
  1133. } else {
  1134. state.setTry(std::move(t));
  1135. }
  1136. });
  1137. // Allow for applying to future with null executor while this is still
  1138. // possible.
  1139. // TODO(T26801487): Should have an executor
  1140. return std::move(sf).via(&InlineExecutor::instance());
  1141. }
  1142. template <class Func>
  1143. auto via(Executor* x, Func&& func) -> Future<
  1144. typename isFutureOrSemiFuture<decltype(std::declval<Func>()())>::Inner> {
  1145. // TODO make this actually more performant. :-P #7260175
  1146. return via(x).thenValue([f = std::forward<Func>(func)](auto&&) mutable {
  1147. return std::forward<Func>(f)();
  1148. });
  1149. }
  1150. template <class Func>
  1151. auto via(Executor::KeepAlive<> x, Func&& func) -> Future<
  1152. typename isFutureOrSemiFuture<decltype(std::declval<Func>()())>::Inner> {
  1153. return via(std::move(x))
  1154. .thenValue([f = std::forward<Func>(func)](auto&&) mutable {
  1155. return std::forward<Func>(f)();
  1156. });
  1157. }
  1158. // makeFuture
  1159. template <class T>
  1160. Future<typename std::decay<T>::type> makeFuture(T&& t) {
  1161. return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
  1162. }
  1163. inline Future<Unit> makeFuture() {
  1164. return makeFuture(Unit{});
  1165. }
  1166. // makeFutureWith(Future<T>()) -> Future<T>
  1167. template <class F>
  1168. typename std::
  1169. enable_if<isFuture<invoke_result_t<F>>::value, invoke_result_t<F>>::type
  1170. makeFutureWith(F&& func) {
  1171. using InnerType = typename isFuture<invoke_result_t<F>>::Inner;
  1172. try {
  1173. return std::forward<F>(func)();
  1174. } catch (std::exception& e) {
  1175. return makeFuture<InnerType>(
  1176. exception_wrapper(std::current_exception(), e));
  1177. } catch (...) {
  1178. return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
  1179. }
  1180. }
  1181. // makeFutureWith(T()) -> Future<T>
  1182. // makeFutureWith(void()) -> Future<Unit>
  1183. template <class F>
  1184. typename std::enable_if<
  1185. !(isFuture<invoke_result_t<F>>::value),
  1186. Future<lift_unit_t<invoke_result_t<F>>>>::type
  1187. makeFutureWith(F&& func) {
  1188. using LiftedResult = lift_unit_t<invoke_result_t<F>>;
  1189. return makeFuture<LiftedResult>(
  1190. makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
  1191. }
  1192. template <class T>
  1193. Future<T> makeFuture(std::exception_ptr const& e) {
  1194. return makeFuture(Try<T>(e));
  1195. }
  1196. template <class T>
  1197. Future<T> makeFuture(exception_wrapper ew) {
  1198. return makeFuture(Try<T>(std::move(ew)));
  1199. }
  1200. template <class T, class E>
  1201. typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::
  1202. type
  1203. makeFuture(E const& e) {
  1204. return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
  1205. }
  1206. template <class T>
  1207. Future<T> makeFuture(Try<T> t) {
  1208. return Future<T>(Future<T>::Core::make(std::move(t)));
  1209. }
  1210. // via
  1211. Future<Unit> via(Executor* executor, int8_t priority) {
  1212. return makeFuture().via(executor, priority);
  1213. }
  1214. Future<Unit> via(Executor::KeepAlive<> executor, int8_t priority) {
  1215. return makeFuture().via(std::move(executor), priority);
  1216. }
  1217. namespace futures {
  1218. namespace detail {
  1219. template <typename V, typename... Fs, std::size_t... Is>
  1220. FOLLY_ALWAYS_INLINE FOLLY_ATTR_VISIBILITY_HIDDEN void
  1221. foreach_(index_sequence<Is...>, V&& v, Fs&&... fs) {
  1222. using _ = int[];
  1223. void(_{0, (void(v(index_constant<Is>{}, static_cast<Fs&&>(fs))), 0)...});
  1224. }
  1225. template <typename V, typename... Fs>
  1226. FOLLY_ALWAYS_INLINE FOLLY_ATTR_VISIBILITY_HIDDEN void foreach(
  1227. V&& v,
  1228. Fs&&... fs) {
  1229. using _ = index_sequence_for<Fs...>;
  1230. foreach_(_{}, static_cast<V&&>(v), static_cast<Fs&&>(fs)...);
  1231. }
  1232. template <typename T>
  1233. DeferredExecutor* getDeferredExecutor(SemiFuture<T>& future) {
  1234. return future.getDeferredExecutor();
  1235. }
  1236. template <typename T>
  1237. folly::Executor::KeepAlive<DeferredExecutor> stealDeferredExecutor(
  1238. SemiFuture<T>& future) {
  1239. return future.stealDeferredExecutor();
  1240. }
  1241. template <typename T>
  1242. folly::Executor::KeepAlive<DeferredExecutor> stealDeferredExecutor(Future<T>&) {
  1243. return {};
  1244. }
  1245. template <typename... Ts>
  1246. void stealDeferredExecutorsVariadic(
  1247. std::vector<folly::Executor::KeepAlive<DeferredExecutor>>& executors,
  1248. Ts&... ts) {
  1249. auto foreach = [&](auto& future) {
  1250. if (auto executor = stealDeferredExecutor(future)) {
  1251. executors.push_back(std::move(executor));
  1252. }
  1253. return folly::unit;
  1254. };
  1255. [](...) {}(foreach(ts)...);
  1256. }
  1257. template <class InputIterator>
  1258. void stealDeferredExecutors(
  1259. std::vector<folly::Executor::KeepAlive<DeferredExecutor>>& executors,
  1260. InputIterator first,
  1261. InputIterator last) {
  1262. for (auto it = first; it != last; ++it) {
  1263. if (auto executor = stealDeferredExecutor(*it)) {
  1264. executors.push_back(std::move(executor));
  1265. }
  1266. }
  1267. }
  1268. } // namespace detail
  1269. } // namespace futures
  1270. // collectAll (variadic)
  1271. template <typename... Fs>
  1272. SemiFuture<std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>>
  1273. collectAllSemiFuture(Fs&&... fs) {
  1274. using Result = std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>;
  1275. struct Context {
  1276. ~Context() {
  1277. p.setValue(std::move(results));
  1278. }
  1279. Promise<Result> p;
  1280. Result results;
  1281. };
  1282. std::vector<folly::Executor::KeepAlive<futures::detail::DeferredExecutor>>
  1283. executors;
  1284. futures::detail::stealDeferredExecutorsVariadic(executors, fs...);
  1285. auto ctx = std::make_shared<Context>();
  1286. futures::detail::foreach(
  1287. [&](auto i, auto&& f) {
  1288. f.setCallback_([i, ctx](auto&& t) {
  1289. std::get<i.value>(ctx->results) = std::move(t);
  1290. });
  1291. },
  1292. static_cast<Fs&&>(fs)...);
  1293. auto future = ctx->p.getSemiFuture();
  1294. if (!executors.empty()) {
  1295. auto work = [](Try<typename decltype(future)::value_type>&& t) {
  1296. return std::move(t).value();
  1297. };
  1298. future = std::move(future).defer(work);
  1299. auto deferredExecutor = futures::detail::getDeferredExecutor(future);
  1300. deferredExecutor->setNestedExecutors(std::move(executors));
  1301. }
  1302. return future;
  1303. }
  1304. template <typename... Fs>
  1305. Future<std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...>> collectAll(
  1306. Fs&&... fs) {
  1307. return collectAllSemiFuture(std::forward<Fs>(fs)...).toUnsafeFuture();
  1308. }
  1309. // collectAll (iterator)
  1310. template <class InputIterator>
  1311. SemiFuture<std::vector<
  1312. Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
  1313. collectAllSemiFuture(InputIterator first, InputIterator last) {
  1314. using F = typename std::iterator_traits<InputIterator>::value_type;
  1315. using T = typename F::value_type;
  1316. struct Context {
  1317. explicit Context(size_t n) : results(n) {}
  1318. ~Context() {
  1319. p.setValue(std::move(results));
  1320. }
  1321. Promise<std::vector<Try<T>>> p;
  1322. std::vector<Try<T>> results;
  1323. };
  1324. std::vector<folly::Executor::KeepAlive<futures::detail::DeferredExecutor>>
  1325. executors;
  1326. futures::detail::stealDeferredExecutors(executors, first, last);
  1327. auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
  1328. for (size_t i = 0; first != last; ++first, ++i) {
  1329. first->setCallback_(
  1330. [i, ctx](Try<T>&& t) { ctx->results[i] = std::move(t); });
  1331. }
  1332. auto future = ctx->p.getSemiFuture();
  1333. if (!executors.empty()) {
  1334. future = std::move(future).defer(
  1335. [](Try<typename decltype(future)::value_type>&& t) {
  1336. return std::move(t).value();
  1337. });
  1338. auto deferredExecutor = futures::detail::getDeferredExecutor(future);
  1339. deferredExecutor->setNestedExecutors(std::move(executors));
  1340. }
  1341. return future;
  1342. }
  1343. template <class InputIterator>
  1344. Future<std::vector<
  1345. Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
  1346. collectAll(InputIterator first, InputIterator last) {
  1347. return collectAllSemiFuture(first, last).toUnsafeFuture();
  1348. }
  1349. // collect (iterator)
  1350. // TODO(T26439406): Make return SemiFuture
  1351. template <class InputIterator>
  1352. Future<std::vector<
  1353. typename std::iterator_traits<InputIterator>::value_type::value_type>>
  1354. collect(InputIterator first, InputIterator last) {
  1355. using F = typename std::iterator_traits<InputIterator>::value_type;
  1356. using T = typename F::value_type;
  1357. struct Context {
  1358. explicit Context(size_t n) : result(n) {
  1359. finalResult.reserve(n);
  1360. }
  1361. ~Context() {
  1362. if (!threw.load(std::memory_order_relaxed)) {
  1363. // map Optional<T> -> T
  1364. std::transform(
  1365. result.begin(),
  1366. result.end(),
  1367. std::back_inserter(finalResult),
  1368. [](Optional<T>& o) { return std::move(o.value()); });
  1369. p.setValue(std::move(finalResult));
  1370. }
  1371. }
  1372. Promise<std::vector<T>> p;
  1373. std::vector<Optional<T>> result;
  1374. std::vector<T> finalResult;
  1375. std::atomic<bool> threw{false};
  1376. };
  1377. auto ctx = std::make_shared<Context>(std::distance(first, last));
  1378. for (size_t i = 0; first != last; ++first, ++i) {
  1379. first->setCallback_([i, ctx](Try<T>&& t) {
  1380. if (t.hasException()) {
  1381. if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
  1382. ctx->p.setException(std::move(t.exception()));
  1383. }
  1384. } else if (!ctx->threw.load(std::memory_order_relaxed)) {
  1385. ctx->result[i] = std::move(t.value());
  1386. }
  1387. });
  1388. }
  1389. return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
  1390. }
  1391. // collect (variadic)
  1392. // TODO(T26439406): Make return SemiFuture
  1393. template <typename... Fs>
  1394. Future<std::tuple<typename remove_cvref_t<Fs>::value_type...>> collect(
  1395. Fs&&... fs) {
  1396. using Result = std::tuple<typename remove_cvref_t<Fs>::value_type...>;
  1397. struct Context {
  1398. ~Context() {
  1399. if (!threw.load(std::memory_order_relaxed)) {
  1400. p.setValue(unwrapTryTuple(std::move(results)));
  1401. }
  1402. }
  1403. Promise<Result> p;
  1404. std::tuple<Try<typename remove_cvref_t<Fs>::value_type>...> results;
  1405. std::atomic<bool> threw{false};
  1406. };
  1407. auto ctx = std::make_shared<Context>();
  1408. futures::detail::foreach(
  1409. [&](auto i, auto&& f) {
  1410. f.setCallback_([i, ctx](auto&& t) {
  1411. if (t.hasException()) {
  1412. if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
  1413. ctx->p.setException(std::move(t.exception()));
  1414. }
  1415. } else if (!ctx->threw.load(std::memory_order_relaxed)) {
  1416. std::get<i.value>(ctx->results) = std::move(t);
  1417. }
  1418. });
  1419. },
  1420. static_cast<Fs&&>(fs)...);
  1421. return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
  1422. }
  1423. // collectAny (iterator)
  1424. // TODO(T26439406): Make return SemiFuture
  1425. template <class InputIterator>
  1426. Future<std::pair<
  1427. size_t,
  1428. Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
  1429. collectAny(InputIterator first, InputIterator last) {
  1430. using F = typename std::iterator_traits<InputIterator>::value_type;
  1431. using T = typename F::value_type;
  1432. struct Context {
  1433. Promise<std::pair<size_t, Try<T>>> p;
  1434. std::atomic<bool> done{false};
  1435. };
  1436. auto ctx = std::make_shared<Context>();
  1437. for (size_t i = 0; first != last; ++first, ++i) {
  1438. first->setCallback_([i, ctx](Try<T>&& t) {
  1439. if (!ctx->done.exchange(true, std::memory_order_relaxed)) {
  1440. ctx->p.setValue(std::make_pair(i, std::move(t)));
  1441. }
  1442. });
  1443. }
  1444. return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
  1445. }
  1446. // collectAnyWithoutException (iterator)
  1447. // TODO(T26439406): Make return SemiFuture
  1448. template <class InputIterator>
  1449. Future<std::pair<
  1450. size_t,
  1451. typename std::iterator_traits<InputIterator>::value_type::value_type>>
  1452. collectAnyWithoutException(InputIterator first, InputIterator last) {
  1453. using F = typename std::iterator_traits<InputIterator>::value_type;
  1454. using T = typename F::value_type;
  1455. struct Context {
  1456. Context(size_t n) : nTotal(n) {}
  1457. Promise<std::pair<size_t, T>> p;
  1458. std::atomic<bool> done{false};
  1459. std::atomic<size_t> nFulfilled{0};
  1460. size_t nTotal;
  1461. };
  1462. auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
  1463. for (size_t i = 0; first != last; ++first, ++i) {
  1464. first->setCallback_([i, ctx](Try<T>&& t) {
  1465. if (!t.hasException() &&
  1466. !ctx->done.exchange(true, std::memory_order_relaxed)) {
  1467. ctx->p.setValue(std::make_pair(i, std::move(t.value())));
  1468. } else if (
  1469. ctx->nFulfilled.fetch_add(1, std::memory_order_relaxed) + 1 ==
  1470. ctx->nTotal) {
  1471. ctx->p.setException(t.exception());
  1472. }
  1473. });
  1474. }
  1475. return ctx->p.getSemiFuture().via(&InlineExecutor::instance());
  1476. }
  1477. // collectN (iterator)
  1478. template <class InputIterator>
  1479. SemiFuture<std::vector<std::pair<
  1480. size_t,
  1481. Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
  1482. collectN(InputIterator first, InputIterator last, size_t n) {
  1483. using F = typename std::iterator_traits<InputIterator>::value_type;
  1484. using T = typename F::value_type;
  1485. using Result = std::vector<std::pair<size_t, Try<T>>>;
  1486. struct Context {
  1487. explicit Context(size_t numFutures, size_t min_)
  1488. : v(numFutures), min(min_) {}
  1489. std::vector<Optional<Try<T>>> v;
  1490. size_t min;
  1491. std::atomic<size_t> completed = {0}; // # input futures completed
  1492. std::atomic<size_t> stored = {0}; // # output values stored
  1493. Promise<Result> p;
  1494. };
  1495. assert(n > 0);
  1496. assert(std::distance(first, last) >= 0);
  1497. if (size_t(std::distance(first, last)) < n) {
  1498. return SemiFuture<Result>(
  1499. exception_wrapper(std::runtime_error("Not enough futures")));
  1500. }
  1501. // for each completed Future, increase count and add to vector, until we
  1502. // have n completed futures at which point we fulfil our Promise with the
  1503. // vector
  1504. auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)), n);
  1505. for (size_t i = 0; first != last; ++first, ++i) {
  1506. first->setCallback_([i, ctx](Try<T>&& t) {
  1507. // relaxed because this guards control but does not guard data
  1508. auto const c = 1 + ctx->completed.fetch_add(1, std::memory_order_relaxed);
  1509. if (c > ctx->min) {
  1510. return;
  1511. }
  1512. ctx->v[i] = std::move(t);
  1513. // release because the stored values in all threads must be visible below
  1514. // acquire because no stored value is permitted to be fetched early
  1515. auto const s = 1 + ctx->stored.fetch_add(1, std::memory_order_acq_rel);
  1516. if (s < ctx->min) {
  1517. return;
  1518. }
  1519. Result result;
  1520. result.reserve(ctx->completed.load());
  1521. for (size_t j = 0; j < ctx->v.size(); ++j) {
  1522. auto& entry = ctx->v[j];
  1523. if (entry.hasValue()) {
  1524. result.emplace_back(j, std::move(entry).value());
  1525. }
  1526. }
  1527. ctx->p.setTry(Try<Result>(std::move(result)));
  1528. });
  1529. }
  1530. return ctx->p.getSemiFuture();
  1531. }
  1532. // reduce (iterator)
  1533. template <class It, class T, class F>
  1534. Future<T> reduce(It first, It last, T&& initial, F&& func) {
  1535. if (first == last) {
  1536. return makeFuture(std::forward<T>(initial));
  1537. }
  1538. typedef typename std::iterator_traits<It>::value_type::value_type ItT;
  1539. typedef typename std::
  1540. conditional<is_invocable<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type
  1541. Arg;
  1542. typedef isTry<Arg> IsTry;
  1543. auto sfunc = std::make_shared<std::decay_t<F>>(std::forward<F>(func));
  1544. auto f = std::move(*first).thenTry(
  1545. [initial = std::forward<T>(initial), sfunc](Try<ItT>&& head) mutable {
  1546. return (*sfunc)(
  1547. std::move(initial), head.template get<IsTry::value, Arg&&>());
  1548. });
  1549. for (++first; first != last; ++first) {
  1550. f = collectAllSemiFuture(f, *first).toUnsafeFuture().thenValue(
  1551. [sfunc](std::tuple<Try<T>, Try<ItT>>&& t) {
  1552. return (*sfunc)(
  1553. std::move(std::get<0>(t).value()),
  1554. // Either return a ItT&& or a Try<ItT>&& depending
  1555. // on the type of the argument of func.
  1556. std::get<1>(t).template get<IsTry::value, Arg&&>());
  1557. });
  1558. }
  1559. return f;
  1560. }
  1561. // window (collection)
  1562. template <class Collection, class F, class ItT, class Result>
  1563. std::vector<Future<Result>> window(Collection input, F func, size_t n) {
  1564. // Use global QueuedImmediateExecutor singleton to avoid stack overflow.
  1565. auto executor = &QueuedImmediateExecutor::instance();
  1566. return window(executor, std::move(input), std::move(func), n);
  1567. }
  1568. template <class F>
  1569. auto window(size_t times, F func, size_t n)
  1570. -> std::vector<invoke_result_t<F, size_t>> {
  1571. return window(futures::detail::WindowFakeVector(times), std::move(func), n);
  1572. }
  1573. template <class Collection, class F, class ItT, class Result>
  1574. std::vector<Future<Result>>
  1575. window(Executor* executor, Collection input, F func, size_t n) {
  1576. return window(
  1577. getKeepAliveToken(executor), std::move(input), std::move(func), n);
  1578. }
  1579. template <class Collection, class F, class ItT, class Result>
  1580. std::vector<Future<Result>>
  1581. window(Executor::KeepAlive<> executor, Collection input, F func, size_t n) {
  1582. struct WindowContext {
  1583. WindowContext(
  1584. Executor::KeepAlive<> executor_,
  1585. Collection&& input_,
  1586. F&& func_)
  1587. : executor(std::move(executor_)),
  1588. input(std::move(input_)),
  1589. promises(input.size()),
  1590. func(std::move(func_)) {}
  1591. std::atomic<size_t> i{0};
  1592. Executor::KeepAlive<> executor;
  1593. Collection input;
  1594. std::vector<Promise<Result>> promises;
  1595. F func;
  1596. static void spawn(std::shared_ptr<WindowContext> ctx) {
  1597. size_t i = ctx->i.fetch_add(1, std::memory_order_relaxed);
  1598. if (i < ctx->input.size()) {
  1599. auto fut = makeSemiFutureWith(
  1600. [&] { return ctx->func(std::move(ctx->input[i])); });
  1601. fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
  1602. ctx->executor->add(
  1603. [ctx = std::move(ctx), i, t = std::move(t)]() mutable {
  1604. ctx->promises[i].setTry(std::move(t));
  1605. // Chain another future onto this one
  1606. spawn(std::move(ctx));
  1607. });
  1608. });
  1609. }
  1610. }
  1611. };
  1612. auto max = std::min(n, input.size());
  1613. auto ctx = std::make_shared<WindowContext>(
  1614. executor.copy(), std::move(input), std::move(func));
  1615. // Start the first n Futures
  1616. for (size_t i = 0; i < max; ++i) {
  1617. executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
  1618. }
  1619. std::vector<Future<Result>> futures;
  1620. futures.reserve(ctx->promises.size());
  1621. for (auto& promise : ctx->promises) {
  1622. futures.emplace_back(promise.getSemiFuture().via(executor.copy()));
  1623. }
  1624. return futures;
  1625. }
  1626. // reduce
  1627. template <class T>
  1628. template <class I, class F>
  1629. Future<I> Future<T>::reduce(I&& initial, F&& func) && {
  1630. return std::move(*this).thenValue(
  1631. [minitial = std::forward<I>(initial),
  1632. mfunc = std::forward<F>(func)](T&& vals) mutable {
  1633. auto ret = std::move(minitial);
  1634. for (auto& val : vals) {
  1635. ret = mfunc(std::move(ret), std::move(val));
  1636. }
  1637. return ret;
  1638. });
  1639. }
  1640. // unorderedReduce (iterator)
  1641. // TODO(T26439406): Make return SemiFuture
  1642. template <class It, class T, class F>
  1643. Future<T> unorderedReduce(It first, It last, T initial, F func) {
  1644. using ItF = typename std::iterator_traits<It>::value_type;
  1645. using ItT = typename ItF::value_type;
  1646. using Arg = MaybeTryArg<F, T, ItT>;
  1647. if (first == last) {
  1648. return makeFuture(std::move(initial));
  1649. }
  1650. typedef isTry<Arg> IsTry;
  1651. struct Context {
  1652. Context(T&& memo, F&& fn, size_t n)
  1653. : lock_(),
  1654. memo_(makeFuture<T>(std::move(memo))),
  1655. func_(std::move(fn)),
  1656. numThens_(0),
  1657. numFutures_(n),
  1658. promise_() {}
  1659. folly::MicroSpinLock lock_; // protects memo_ and numThens_
  1660. Future<T> memo_;
  1661. F func_;
  1662. size_t numThens_; // how many Futures completed and called .then()
  1663. size_t numFutures_; // how many Futures in total
  1664. Promise<T> promise_;
  1665. };
  1666. struct Fulfill {
  1667. void operator()(Promise<T>&& p, T&& v) const {
  1668. p.setValue(std::move(v));
  1669. }
  1670. void operator()(Promise<T>&& p, Future<T>&& f) const {
  1671. f.setCallback_(
  1672. [p = std::move(p)](Try<T>&& t) mutable { p.setTry(std::move(t)); });
  1673. }
  1674. };
  1675. auto ctx = std::make_shared<Context>(
  1676. std::move(initial), std::move(func), std::distance(first, last));
  1677. for (size_t i = 0; first != last; ++first, ++i) {
  1678. first->setCallback_([i, ctx](Try<ItT>&& t) {
  1679. (void)i;
  1680. // Futures can be completed in any order, simultaneously.
  1681. // To make this non-blocking, we create a new Future chain in
  1682. // the order of completion to reduce the values.
  1683. // The spinlock just protects chaining a new Future, not actually
  1684. // executing the reduce, which should be really fast.
  1685. Promise<T> p;
  1686. auto f = p.getFuture();
  1687. {
  1688. folly::MSLGuard lock(ctx->lock_);
  1689. f = exchange(ctx->memo_, std::move(f));
  1690. if (++ctx->numThens_ == ctx->numFutures_) {
  1691. // After reducing the value of the last Future, fulfill the Promise
  1692. ctx->memo_.setCallback_(
  1693. [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
  1694. }
  1695. }
  1696. f.setCallback_(
  1697. [ctx, mp = std::move(p), mt = std::move(t)](Try<T>&& v) mutable {
  1698. if (v.hasValue()) {
  1699. try {
  1700. Fulfill{}(
  1701. std::move(mp),
  1702. ctx->func_(
  1703. std::move(v.value()),
  1704. mt.template get<IsTry::value, Arg&&>()));
  1705. } catch (std::exception& e) {
  1706. mp.setException(exception_wrapper(std::current_exception(), e));
  1707. } catch (...) {
  1708. mp.setException(exception_wrapper(std::current_exception()));
  1709. }
  1710. } else {
  1711. mp.setTry(std::move(v));
  1712. }
  1713. });
  1714. });
  1715. }
  1716. return ctx->promise_.getSemiFuture().via(&InlineExecutor::instance());
  1717. }
  1718. // within
  1719. template <class T>
  1720. Future<T> Future<T>::within(Duration dur, Timekeeper* tk) && {
  1721. return std::move(*this).within(dur, FutureTimeout(), tk);
  1722. }
  1723. template <class T>
  1724. template <class E>
  1725. Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) && {
  1726. if (this->isReady()) {
  1727. return std::move(*this);
  1728. }
  1729. auto* exe = this->getExecutor();
  1730. return std::move(*this)
  1731. .withinImplementation(dur, e, tk)
  1732. .via(exe ? exe : &InlineExecutor::instance());
  1733. }
  1734. // delayed
  1735. template <class T>
  1736. Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) && {
  1737. auto e = this->getExecutor();
  1738. return collectAllSemiFuture(*this, futures::sleep(dur, tk))
  1739. .via(e ? e : &InlineExecutor::instance())
  1740. .thenValue([](std::tuple<Try<T>, Try<Unit>>&& tup) {
  1741. return makeFuture<T>(std::get<0>(std::move(tup)));
  1742. });
  1743. }
  1744. template <class T>
  1745. Future<T> Future<T>::delayedUnsafe(Duration dur, Timekeeper* tk) {
  1746. return std::move(*this).semi().delayed(dur, tk).toUnsafeFuture();
  1747. }
  1748. namespace futures {
  1749. namespace detail {
  1750. template <class FutureType, typename T = typename FutureType::value_type>
  1751. void waitImpl(FutureType& f) {
  1752. if (std::is_base_of<Future<T>, FutureType>::value) {
  1753. f = std::move(f).via(&InlineExecutor::instance());
  1754. }
  1755. // short-circuit if there's nothing to do
  1756. if (f.isReady()) {
  1757. return;
  1758. }
  1759. Promise<T> promise;
  1760. auto ret = promise.getSemiFuture();
  1761. auto baton = std::make_shared<FutureBatonType>();
  1762. f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
  1763. promise.setTry(std::move(t));
  1764. baton->post();
  1765. });
  1766. convertFuture(std::move(ret), f);
  1767. baton->wait();
  1768. assert(f.isReady());
  1769. }
  1770. template <class T>
  1771. void convertFuture(SemiFuture<T>&& sf, Future<T>& f) {
  1772. // Carry executor from f, inserting an inline executor if it did not have one
  1773. auto* exe = f.getExecutor();
  1774. f = std::move(sf).via(exe ? exe : &InlineExecutor::instance());
  1775. }
  1776. template <class T>
  1777. void convertFuture(SemiFuture<T>&& sf, SemiFuture<T>& f) {
  1778. f = std::move(sf);
  1779. }
  1780. template <class FutureType, typename T = typename FutureType::value_type>
  1781. void waitImpl(FutureType& f, Duration dur) {
  1782. if (std::is_base_of<Future<T>, FutureType>::value) {
  1783. f = std::move(f).via(&InlineExecutor::instance());
  1784. }
  1785. // short-circuit if there's nothing to do
  1786. if (f.isReady()) {
  1787. return;
  1788. }
  1789. Promise<T> promise;
  1790. auto ret = promise.getSemiFuture();
  1791. auto baton = std::make_shared<FutureBatonType>();
  1792. f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
  1793. promise.setTry(std::move(t));
  1794. baton->post();
  1795. });
  1796. convertFuture(std::move(ret), f);
  1797. if (baton->try_wait_for(dur)) {
  1798. assert(f.isReady());
  1799. }
  1800. }
  1801. template <class T>
  1802. void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
  1803. // Set callback so to ensure that the via executor has something on it
  1804. // so that once the preceding future triggers this callback, drive will
  1805. // always have a callback to satisfy it
  1806. if (f.isReady()) {
  1807. return;
  1808. }
  1809. f = std::move(f).via(e).thenValue([](T&& t) { return std::move(t); });
  1810. while (!f.isReady()) {
  1811. e->drive();
  1812. }
  1813. assert(f.isReady());
  1814. f = std::move(f).via(&InlineExecutor::instance());
  1815. }
  1816. template <class T, typename Rep, typename Period>
  1817. void waitViaImpl(
  1818. Future<T>& f,
  1819. TimedDrivableExecutor* e,
  1820. const std::chrono::duration<Rep, Period>& timeout) {
  1821. // Set callback so to ensure that the via executor has something on it
  1822. // so that once the preceding future triggers this callback, drive will
  1823. // always have a callback to satisfy it
  1824. if (f.isReady()) {
  1825. return;
  1826. }
  1827. // Chain operations, ensuring that the executor is kept alive for the duration
  1828. f = std::move(f).via(e).thenValue(
  1829. [keepAlive = getKeepAliveToken(e)](T&& t) { return std::move(t); });
  1830. auto now = std::chrono::steady_clock::now();
  1831. auto deadline = now + timeout;
  1832. while (!f.isReady() && (now < deadline)) {
  1833. e->try_drive_until(deadline);
  1834. now = std::chrono::steady_clock::now();
  1835. }
  1836. assert(f.isReady() || (now >= deadline));
  1837. if (f.isReady()) {
  1838. f = std::move(f).via(&InlineExecutor::instance());
  1839. }
  1840. }
  1841. } // namespace detail
  1842. } // namespace futures
  1843. template <class T>
  1844. SemiFuture<T>& SemiFuture<T>::wait() & {
  1845. if (auto deferredExecutor = getDeferredExecutor()) {
  1846. // Make sure that the last callback in the future chain will be run on the
  1847. // WaitExecutor.
  1848. Promise<T> promise;
  1849. auto ret = promise.getSemiFuture();
  1850. setCallback_(
  1851. [p = std::move(promise)](auto&& r) mutable { p.setTry(std::move(r)); });
  1852. auto waitExecutor = futures::detail::WaitExecutor::create();
  1853. deferredExecutor->setExecutor(waitExecutor.copy());
  1854. while (!ret.isReady()) {
  1855. waitExecutor->drive();
  1856. }
  1857. waitExecutor->detach();
  1858. this->detach();
  1859. *this = std::move(ret);
  1860. } else {
  1861. futures::detail::waitImpl(*this);
  1862. }
  1863. return *this;
  1864. }
  1865. template <class T>
  1866. SemiFuture<T>&& SemiFuture<T>::wait() && {
  1867. return std::move(wait());
  1868. }
  1869. template <class T>
  1870. SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
  1871. if (auto deferredExecutor = getDeferredExecutor()) {
  1872. // Make sure that the last callback in the future chain will be run on the
  1873. // WaitExecutor.
  1874. Promise<T> promise;
  1875. auto ret = promise.getSemiFuture();
  1876. setCallback_(
  1877. [p = std::move(promise)](auto&& r) mutable { p.setTry(std::move(r)); });
  1878. auto waitExecutor = futures::detail::WaitExecutor::create();
  1879. auto deadline = futures::detail::WaitExecutor::Clock::now() + dur;
  1880. deferredExecutor->setExecutor(waitExecutor.copy());
  1881. while (!ret.isReady()) {
  1882. if (!waitExecutor->driveUntil(deadline)) {
  1883. break;
  1884. }
  1885. }
  1886. waitExecutor->detach();
  1887. this->detach();
  1888. *this = std::move(ret);
  1889. } else {
  1890. futures::detail::waitImpl(*this, dur);
  1891. }
  1892. return *this;
  1893. }
  1894. template <class T>
  1895. bool SemiFuture<T>::wait(Duration dur) && {
  1896. auto future = std::move(*this);
  1897. future.wait(dur);
  1898. return future.isReady();
  1899. }
  1900. template <class T>
  1901. T SemiFuture<T>::get() && {
  1902. return std::move(*this).getTry().value();
  1903. }
  1904. template <class T>
  1905. T SemiFuture<T>::get(Duration dur) && {
  1906. return std::move(*this).getTry(dur).value();
  1907. }
  1908. template <class T>
  1909. Try<T> SemiFuture<T>::getTry() && {
  1910. wait();
  1911. auto future = folly::Future<T>(this->core_);
  1912. this->core_ = nullptr;
  1913. return std::move(std::move(future).getTry());
  1914. }
  1915. template <class T>
  1916. Try<T> SemiFuture<T>::getTry(Duration dur) && {
  1917. wait(dur);
  1918. auto future = folly::Future<T>(this->core_);
  1919. this->core_ = nullptr;
  1920. if (!future.isReady()) {
  1921. throw_exception<FutureTimeout>();
  1922. }
  1923. return std::move(std::move(future).getTry());
  1924. }
  1925. template <class T>
  1926. Future<T>& Future<T>::wait() & {
  1927. futures::detail::waitImpl(*this);
  1928. return *this;
  1929. }
  1930. template <class T>
  1931. Future<T>&& Future<T>::wait() && {
  1932. futures::detail::waitImpl(*this);
  1933. return std::move(*this);
  1934. }
  1935. template <class T>
  1936. Future<T>& Future<T>::wait(Duration dur) & {
  1937. futures::detail::waitImpl(*this, dur);
  1938. return *this;
  1939. }
  1940. template <class T>
  1941. Future<T>&& Future<T>::wait(Duration dur) && {
  1942. futures::detail::waitImpl(*this, dur);
  1943. return std::move(*this);
  1944. }
  1945. template <class T>
  1946. Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
  1947. futures::detail::waitViaImpl(*this, e);
  1948. return *this;
  1949. }
  1950. template <class T>
  1951. Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
  1952. futures::detail::waitViaImpl(*this, e);
  1953. return std::move(*this);
  1954. }
  1955. template <class T>
  1956. Future<T>& Future<T>::waitVia(TimedDrivableExecutor* e, Duration dur) & {
  1957. futures::detail::waitViaImpl(*this, e, dur);
  1958. return *this;
  1959. }
  1960. template <class T>
  1961. Future<T>&& Future<T>::waitVia(TimedDrivableExecutor* e, Duration dur) && {
  1962. futures::detail::waitViaImpl(*this, e, dur);
  1963. return std::move(*this);
  1964. }
  1965. template <class T>
  1966. T Future<T>::get() && {
  1967. wait();
  1968. return copy(std::move(*this)).value();
  1969. }
  1970. template <class T>
  1971. T Future<T>::get(Duration dur) && {
  1972. wait(dur);
  1973. auto future = copy(std::move(*this));
  1974. if (!future.isReady()) {
  1975. throw_exception<FutureTimeout>();
  1976. }
  1977. return std::move(future).value();
  1978. }
  1979. template <class T>
  1980. Try<T>& Future<T>::getTry() {
  1981. return result();
  1982. }
  1983. template <class T>
  1984. T Future<T>::getVia(DrivableExecutor* e) {
  1985. return std::move(waitVia(e).value());
  1986. }
  1987. template <class T>
  1988. T Future<T>::getVia(TimedDrivableExecutor* e, Duration dur) {
  1989. waitVia(e, dur);
  1990. if (!this->isReady()) {
  1991. throw_exception<FutureTimeout>();
  1992. }
  1993. return std::move(value());
  1994. }
  1995. template <class T>
  1996. Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
  1997. return waitVia(e).getTry();
  1998. }
  1999. template <class T>
  2000. Try<T>& Future<T>::getTryVia(TimedDrivableExecutor* e, Duration dur) {
  2001. waitVia(e, dur);
  2002. if (!this->isReady()) {
  2003. throw_exception<FutureTimeout>();
  2004. }
  2005. return result();
  2006. }
  2007. namespace futures {
  2008. namespace detail {
  2009. template <class T>
  2010. struct TryEquals {
  2011. static bool equals(const Try<T>& t1, const Try<T>& t2) {
  2012. return t1.value() == t2.value();
  2013. }
  2014. };
  2015. } // namespace detail
  2016. } // namespace futures
  2017. template <class T>
  2018. Future<bool> Future<T>::willEqual(Future<T>& f) {
  2019. return collectAllSemiFuture(*this, f).toUnsafeFuture().thenValue(
  2020. [](const std::tuple<Try<T>, Try<T>>& t) {
  2021. if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
  2022. return futures::detail::TryEquals<T>::equals(
  2023. std::get<0>(t), std::get<1>(t));
  2024. } else {
  2025. return false;
  2026. }
  2027. });
  2028. }
  2029. template <class T>
  2030. template <class F>
  2031. Future<T> Future<T>::filter(F&& predicate) && {
  2032. return std::move(*this).thenValue([p = std::forward<F>(predicate)](T val) {
  2033. T const& valConstRef = val;
  2034. if (!p(valConstRef)) {
  2035. throw_exception<FuturePredicateDoesNotObtain>();
  2036. }
  2037. return val;
  2038. });
  2039. }
  2040. template <class F>
  2041. Future<Unit> when(bool p, F&& thunk) {
  2042. return p ? std::forward<F>(thunk)().unit() : makeFuture();
  2043. }
  2044. template <class P, class F>
  2045. Future<Unit> whileDo(P&& predicate, F&& thunk) {
  2046. if (predicate()) {
  2047. auto future = thunk();
  2048. return std::move(future).thenValue(
  2049. [predicate = std::forward<P>(predicate),
  2050. thunk = std::forward<F>(thunk)](auto&&) mutable {
  2051. return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
  2052. });
  2053. }
  2054. return makeFuture();
  2055. }
  2056. template <class F>
  2057. Future<Unit> times(const int n, F&& thunk) {
  2058. return folly::whileDo(
  2059. [n, count = std::make_unique<std::atomic<int>>(0)]() mutable {
  2060. return count->fetch_add(1, std::memory_order_relaxed) < n;
  2061. },
  2062. std::forward<F>(thunk));
  2063. }
  2064. namespace futures {
  2065. template <class It, class F, class ItT, class Result>
  2066. std::vector<Future<Result>> map(It first, It last, F func) {
  2067. std::vector<Future<Result>> results;
  2068. results.reserve(std::distance(first, last));
  2069. for (auto it = first; it != last; it++) {
  2070. FOLLY_PUSH_WARNING
  2071. FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
  2072. results.push_back(std::move(*it).then(func));
  2073. FOLLY_POP_WARNING
  2074. }
  2075. return results;
  2076. }
  2077. template <class It, class F, class ItT, class Result>
  2078. std::vector<Future<Result>> map(Executor& exec, It first, It last, F func) {
  2079. std::vector<Future<Result>> results;
  2080. results.reserve(std::distance(first, last));
  2081. for (auto it = first; it != last; it++) {
  2082. FOLLY_PUSH_WARNING
  2083. FOLLY_GNU_DISABLE_WARNING("-Wdeprecated-declarations")
  2084. results.push_back(std::move(*it).via(&exec).then(func));
  2085. FOLLY_POP_WARNING
  2086. }
  2087. return results;
  2088. }
  2089. } // namespace futures
  2090. template <class Clock>
  2091. Future<Unit> Timekeeper::at(std::chrono::time_point<Clock> when) {
  2092. auto now = Clock::now();
  2093. if (when <= now) {
  2094. return makeFuture();
  2095. }
  2096. return after(std::chrono::duration_cast<Duration>(when - now));
  2097. }
  2098. // Instantiate the most common Future types to save compile time
  2099. extern template class Future<Unit>;
  2100. extern template class Future<bool>;
  2101. extern template class Future<int>;
  2102. extern template class Future<int64_t>;
  2103. extern template class Future<std::string>;
  2104. extern template class Future<double>;
  2105. } // namespace folly