Observable-inl.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. * Copyright 2016-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. namespace folly {
  18. namespace observer {
  19. namespace detail {
  20. template <typename Observable, typename Traits>
  21. class ObserverCreatorContext {
  22. using T = typename Traits::element_type;
  23. public:
  24. template <typename... Args>
  25. ObserverCreatorContext(Args&&... args)
  26. : observable_(std::forward<Args>(args)...) {
  27. updateValue();
  28. }
  29. ~ObserverCreatorContext() {
  30. if (value_.copy()) {
  31. Traits::unsubscribe(observable_);
  32. }
  33. }
  34. void setCore(observer_detail::Core::WeakPtr coreWeak) {
  35. coreWeak_ = std::move(coreWeak);
  36. }
  37. std::shared_ptr<const T> get() {
  38. updateRequested_ = false;
  39. return value_.copy();
  40. }
  41. void update() {
  42. // This mutex ensures there's no race condition between initial update()
  43. // call and update() calls from the subsciption callback.
  44. //
  45. // Additionally it helps avoid races between two different subscription
  46. // callbacks (getting new value from observable and storing it into value_
  47. // is not atomic).
  48. std::lock_guard<std::mutex> lg(updateMutex_);
  49. if (!updateValue()) {
  50. // Value didn't change, so we can skip the version update.
  51. return;
  52. }
  53. bool expected = false;
  54. if (updateRequested_.compare_exchange_strong(expected, true)) {
  55. observer_detail::ObserverManager::scheduleRefreshNewVersion(coreWeak_);
  56. }
  57. }
  58. template <typename F>
  59. void subscribe(F&& callback) {
  60. Traits::subscribe(observable_, std::forward<F>(callback));
  61. }
  62. private:
  63. bool updateValue() {
  64. auto newValue = Traits::get(observable_);
  65. auto newValuePtr = newValue.get();
  66. if (!newValue) {
  67. throw std::logic_error("Observable returned nullptr.");
  68. }
  69. value_.swap(newValue);
  70. return newValuePtr != newValue.get();
  71. }
  72. folly::Synchronized<std::shared_ptr<const T>> value_;
  73. std::atomic<bool> updateRequested_{false};
  74. observer_detail::Core::WeakPtr coreWeak_;
  75. Observable observable_;
  76. std::mutex updateMutex_;
  77. };
  78. } // namespace detail
  79. template <typename Observable, typename Traits>
  80. template <typename... Args>
  81. ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
  82. : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
  83. template <typename Observable, typename Traits>
  84. Observer<typename ObserverCreator<Observable, Traits>::T>
  85. ObserverCreator<Observable, Traits>::getObserver() && {
  86. // This master shared_ptr allows grabbing derived weak_ptrs, pointing to the
  87. // the same Context object, but using a separate reference count. Master
  88. // shared_ptr destructor then blocks until all shared_ptrs obtained from
  89. // derived weak_ptrs are released.
  90. class ContextMasterPointer {
  91. public:
  92. explicit ContextMasterPointer(std::shared_ptr<Context> context)
  93. : contextMaster_(std::move(context)),
  94. context_(
  95. contextMaster_.get(),
  96. [destroyBaton = destroyBaton_](Context*) {
  97. destroyBaton->post();
  98. }) {}
  99. ~ContextMasterPointer() {
  100. if (context_) {
  101. context_.reset();
  102. destroyBaton_->wait();
  103. }
  104. }
  105. ContextMasterPointer(const ContextMasterPointer&) = delete;
  106. ContextMasterPointer(ContextMasterPointer&&) = default;
  107. ContextMasterPointer& operator=(const ContextMasterPointer&) = delete;
  108. ContextMasterPointer& operator=(ContextMasterPointer&&) = default;
  109. Context* operator->() const {
  110. return contextMaster_.get();
  111. }
  112. std::weak_ptr<Context> get_weak() {
  113. return context_;
  114. }
  115. private:
  116. std::shared_ptr<folly::Baton<>> destroyBaton_{
  117. std::make_shared<folly::Baton<>>()};
  118. std::shared_ptr<Context> contextMaster_;
  119. std::shared_ptr<Context> context_;
  120. };
  121. // We want to make sure that Context can only be destroyed when Core is
  122. // destroyed. So we have to avoid the situation when subscribe callback is
  123. // locking Context shared_ptr and remains the last to release it.
  124. // We solve this by having Core hold the master shared_ptr and subscription
  125. // callback gets derived weak_ptr.
  126. ContextMasterPointer contextMaster(context_);
  127. auto contextWeak = contextMaster.get_weak();
  128. auto observer = makeObserver(
  129. [context = std::move(contextMaster)]() { return context->get(); });
  130. context_->setCore(observer.core_);
  131. context_->subscribe([contextWeak = std::move(contextWeak)] {
  132. if (auto context = contextWeak.lock()) {
  133. context->update();
  134. }
  135. });
  136. // Do an extra update in case observable was updated between observer creation
  137. // and setting updates callback.
  138. context_->update();
  139. context_.reset();
  140. return observer;
  141. }
  142. } // namespace observer
  143. } // namespace folly