DefaultKeepAliveExecutor.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. /*
  2. * Copyright 2018-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <future>
  18. #include <glog/logging.h>
  19. #include <folly/Executor.h>
  20. #include <folly/synchronization/Baton.h>
  21. namespace folly {
  22. /// An Executor accepts units of work with add(), which should be
  23. /// threadsafe.
  24. class DefaultKeepAliveExecutor : public virtual Executor {
  25. public:
  26. DefaultKeepAliveExecutor() : Executor() {}
  27. virtual ~DefaultKeepAliveExecutor() {
  28. DCHECK(!keepAlive_);
  29. }
  30. folly::Executor::KeepAlive<> weakRef() {
  31. return WeakRef::create(controlBlock_, this);
  32. }
  33. protected:
  34. void joinKeepAlive() {
  35. DCHECK(keepAlive_);
  36. keepAlive_.reset();
  37. keepAliveReleaseBaton_.wait();
  38. }
  39. private:
  40. struct ControlBlock {
  41. std::atomic<ssize_t> keepAliveCount_{1};
  42. };
  43. class WeakRef : public Executor {
  44. public:
  45. static folly::Executor::KeepAlive<> create(
  46. std::shared_ptr<ControlBlock> controlBlock,
  47. Executor* executor) {
  48. return makeKeepAlive(new WeakRef(std::move(controlBlock), executor));
  49. }
  50. void add(Func f) override {
  51. if (auto executor = lock()) {
  52. executor->add(std::move(f));
  53. }
  54. }
  55. void addWithPriority(Func f, int8_t priority) override {
  56. if (auto executor = lock()) {
  57. executor->addWithPriority(std::move(f), priority);
  58. }
  59. }
  60. virtual uint8_t getNumPriorities() const override {
  61. return numPriorities_;
  62. }
  63. private:
  64. WeakRef(std::shared_ptr<ControlBlock> controlBlock, Executor* executor)
  65. : controlBlock_(std::move(controlBlock)),
  66. executor_(executor),
  67. numPriorities_(executor->getNumPriorities()) {}
  68. bool keepAliveAcquire() override {
  69. auto keepAliveCount =
  70. keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
  71. // We should never increment from 0
  72. DCHECK(keepAliveCount > 0);
  73. return true;
  74. }
  75. void keepAliveRelease() override {
  76. auto keepAliveCount =
  77. keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
  78. DCHECK(keepAliveCount >= 1);
  79. if (keepAliveCount == 1) {
  80. delete this;
  81. }
  82. }
  83. folly::Executor::KeepAlive<> lock() {
  84. auto controlBlock =
  85. controlBlock_->keepAliveCount_.load(std::memory_order_relaxed);
  86. do {
  87. if (controlBlock == 0) {
  88. return {};
  89. }
  90. } while (!controlBlock_->keepAliveCount_.compare_exchange_weak(
  91. controlBlock,
  92. controlBlock + 1,
  93. std::memory_order_release,
  94. std::memory_order_relaxed));
  95. return makeKeepAlive(executor_);
  96. }
  97. std::atomic<size_t> keepAliveCount_{1};
  98. std::shared_ptr<ControlBlock> controlBlock_;
  99. Executor* executor_;
  100. uint8_t numPriorities_;
  101. };
  102. bool keepAliveAcquire() override {
  103. auto keepAliveCount =
  104. controlBlock_->keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
  105. // We should never increment from 0
  106. DCHECK(keepAliveCount > 0);
  107. return true;
  108. }
  109. void keepAliveRelease() override {
  110. auto keepAliveCount =
  111. controlBlock_->keepAliveCount_.fetch_sub(1, std::memory_order_acquire);
  112. DCHECK(keepAliveCount >= 1);
  113. if (keepAliveCount == 1) {
  114. keepAliveReleaseBaton_.post(); // std::memory_order_release
  115. }
  116. }
  117. std::shared_ptr<ControlBlock> controlBlock_{std::make_shared<ControlBlock>()};
  118. Baton<> keepAliveReleaseBaton_;
  119. KeepAlive<DefaultKeepAliveExecutor> keepAlive_{
  120. makeKeepAlive<DefaultKeepAliveExecutor>(this)};
  121. };
  122. } // namespace folly