Core.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. /*
  2. * Copyright 2016-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/experimental/observer/detail/Core.h>
  17. #include <folly/ExceptionString.h>
  18. #include <folly/experimental/observer/detail/ObserverManager.h>
  19. namespace folly {
  20. namespace observer_detail {
  21. Core::VersionedData Core::getData() {
  22. if (!ObserverManager::inManagerThread()) {
  23. return data_.copy();
  24. }
  25. ObserverManager::DependencyRecorder::markDependency(shared_from_this());
  26. auto version = ObserverManager::getVersion();
  27. if (version_ >= version) {
  28. return data_.copy();
  29. }
  30. refresh(version);
  31. DCHECK_GE(version_, version);
  32. return data_.copy();
  33. }
  34. size_t Core::refresh(size_t version, bool force) {
  35. CHECK(ObserverManager::inManagerThread());
  36. ObserverManager::DependencyRecorder::markRefreshDependency(*this);
  37. SCOPE_EXIT {
  38. ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
  39. };
  40. if (version_ >= version) {
  41. return versionLastChange_;
  42. }
  43. {
  44. std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
  45. // Recheck in case this code was already refreshed
  46. if (version_ >= version) {
  47. return versionLastChange_;
  48. }
  49. bool needRefresh = force || version_ == 0;
  50. ObserverManager::DependencyRecorder dependencyRecorder(*this);
  51. // This can be run in parallel, but we expect most updates to propagate
  52. // bottom to top.
  53. dependencies_.withRLock([&](const Dependencies& dependencies) {
  54. for (const auto& dependency : dependencies) {
  55. try {
  56. if (dependency->refresh(version) > version_) {
  57. needRefresh = true;
  58. break;
  59. }
  60. } catch (...) {
  61. LOG(ERROR) << "Exception while checking dependencies for updates: "
  62. << exceptionStr(std::current_exception());
  63. needRefresh = true;
  64. break;
  65. }
  66. }
  67. });
  68. if (!needRefresh) {
  69. version_ = version;
  70. return versionLastChange_;
  71. }
  72. try {
  73. {
  74. VersionedData newData{creator_(), version};
  75. if (!newData.data) {
  76. throw std::logic_error("Observer creator returned nullptr.");
  77. }
  78. data_.swap(newData);
  79. }
  80. versionLastChange_ = version;
  81. } catch (...) {
  82. LOG(ERROR) << "Exception while refreshing Observer: "
  83. << exceptionStr(std::current_exception());
  84. if (version_ == 0) {
  85. // Re-throw exception if this is the first time we run creator
  86. throw;
  87. }
  88. }
  89. version_ = version;
  90. if (versionLastChange_ != version) {
  91. return versionLastChange_;
  92. }
  93. auto newDependencies = dependencyRecorder.release();
  94. dependencies_.withWLock([&](Dependencies& dependencies) {
  95. for (const auto& dependency : newDependencies) {
  96. if (!dependencies.count(dependency)) {
  97. dependency->addDependent(this->shared_from_this());
  98. }
  99. }
  100. for (const auto& dependency : dependencies) {
  101. if (!newDependencies.count(dependency)) {
  102. dependency->removeStaleDependents();
  103. }
  104. }
  105. dependencies = std::move(newDependencies);
  106. });
  107. }
  108. auto dependents = dependents_.copy();
  109. for (const auto& dependentWeak : dependents) {
  110. if (auto dependent = dependentWeak.lock()) {
  111. ObserverManager::scheduleRefresh(std::move(dependent), version);
  112. }
  113. }
  114. return versionLastChange_;
  115. }
  116. Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
  117. : creator_(std::move(creator)) {}
  118. Core::~Core() {
  119. dependencies_.withWLock([](const Dependencies& dependencies) {
  120. for (const auto& dependecy : dependencies) {
  121. dependecy->removeStaleDependents();
  122. }
  123. });
  124. }
  125. Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
  126. auto core = Core::Ptr(new Core(std::move(creator)));
  127. return core;
  128. }
  129. void Core::addDependent(Core::WeakPtr dependent) {
  130. dependents_.withWLock([&](Dependents& dependents) {
  131. dependents.push_back(std::move(dependent));
  132. });
  133. }
  134. void Core::removeStaleDependents() {
  135. // This is inefficient, the assumption is that we won't have many dependents
  136. dependents_.withWLock([](Dependents& dependents) {
  137. for (size_t i = 0; i < dependents.size(); ++i) {
  138. if (dependents[i].expired()) {
  139. std::swap(dependents[i], dependents.back());
  140. dependents.pop_back();
  141. --i;
  142. }
  143. }
  144. });
  145. }
  146. } // namespace observer_detail
  147. } // namespace folly