ObserverTest.cpp 8.9 KB


  1. /*
  2. * Copyright 2016-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <thread>
  17. #include <folly/experimental/observer/SimpleObservable.h>
  18. #include <folly/portability/GTest.h>
  19. #include <folly/synchronization/Baton.h>
  20. using namespace folly::observer;
  21. TEST(Observer, Observable) {
  22. SimpleObservable<int> observable(42);
  23. auto observer = observable.getObserver();
  24. EXPECT_EQ(42, **observer);
  25. folly::Baton<> baton;
  26. auto waitingObserver = makeObserver([observer, &baton]() {
  27. *observer;
  28. baton.post();
  29. return folly::Unit();
  30. });
  31. baton.reset();
  32. observable.setValue(24);
  33. EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
  34. EXPECT_EQ(24, **observer);
  35. }
  36. TEST(Observer, MakeObserver) {
  37. SimpleObservable<int> observable(42);
  38. auto observer = makeObserver(
  39. [child = observable.getObserver()]() { return **child + 1; });
  40. EXPECT_EQ(43, **observer);
  41. folly::Baton<> baton;
  42. auto waitingObserver = makeObserver([observer, &baton]() {
  43. *observer;
  44. baton.post();
  45. return folly::Unit();
  46. });
  47. baton.reset();
  48. observable.setValue(24);
  49. EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
  50. EXPECT_EQ(25, **observer);
  51. }
  52. TEST(Observer, MakeObserverDiamond) {
  53. SimpleObservable<int> observable(42);
  54. auto observer1 = makeObserver(
  55. [child = observable.getObserver()]() { return **child + 1; });
  56. auto observer2 = makeObserver([child = observable.getObserver()]() {
  57. return std::make_shared<int>(**child + 2);
  58. });
  59. auto observer = makeObserver(
  60. [observer1, observer2]() { return (**observer1) * (**observer2); });
  61. EXPECT_EQ(43 * 44, *observer.getSnapshot());
  62. folly::Baton<> baton;
  63. auto waitingObserver = makeObserver([observer, &baton]() {
  64. *observer;
  65. baton.post();
  66. return folly::Unit();
  67. });
  68. baton.reset();
  69. observable.setValue(24);
  70. EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
  71. EXPECT_EQ(25 * 26, **observer);
  72. }
  73. TEST(Observer, CreateException) {
  74. struct ExpectedException {};
  75. EXPECT_THROW(
  76. auto observer = makeObserver(
  77. []() -> std::shared_ptr<int> { throw ExpectedException(); }),
  78. ExpectedException);
  79. EXPECT_THROW(
  80. auto observer =
  81. makeObserver([]() -> std::shared_ptr<int> { return nullptr; }),
  82. std::logic_error);
  83. }
  84. TEST(Observer, NullValue) {
  85. SimpleObservable<int> observable(41);
  86. auto oddObserver = makeObserver([innerObserver = observable.getObserver()]() {
  87. auto value = **innerObserver;
  88. if (value % 2 != 0) {
  89. return value * 2;
  90. }
  91. throw std::logic_error("I prefer odd numbers");
  92. });
  93. folly::Baton<> baton;
  94. auto waitingObserver = makeObserver([oddObserver, &baton]() {
  95. *oddObserver;
  96. baton.post();
  97. return folly::Unit();
  98. });
  99. baton.reset();
  100. EXPECT_EQ(82, **oddObserver);
  101. observable.setValue(2);
  102. // Waiting observer shouldn't be updated
  103. EXPECT_FALSE(baton.try_wait_for(std::chrono::seconds{1}));
  104. baton.reset();
  105. EXPECT_EQ(82, **oddObserver);
  106. observable.setValue(23);
  107. EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
  108. EXPECT_EQ(46, **oddObserver);
  109. }
  110. TEST(Observer, Cycle) {
  111. SimpleObservable<int> observable(0);
  112. auto observer = observable.getObserver();
  113. folly::Optional<Observer<int>> observerB;
  114. auto observerA = makeObserver([observer, &observerB]() {
  115. auto value = **observer;
  116. if (value == 1) {
  117. **observerB;
  118. }
  119. return value;
  120. });
  121. observerB = makeObserver([observerA]() { return **observerA; });
  122. auto collectObserver = makeObserver([observer, observerA, &observerB]() {
  123. auto value = **observer;
  124. auto valueA = **observerA;
  125. auto valueB = ***observerB;
  126. if (value == 1) {
  127. if (valueA == 0) {
  128. EXPECT_EQ(0, valueB);
  129. } else {
  130. EXPECT_EQ(1, valueA);
  131. EXPECT_EQ(0, valueB);
  132. }
  133. } else if (value == 2) {
  134. EXPECT_EQ(value, valueA);
  135. EXPECT_TRUE(valueB == 0 || valueB == 2);
  136. } else {
  137. EXPECT_EQ(value, valueA);
  138. EXPECT_EQ(value, valueB);
  139. }
  140. return value;
  141. });
  142. folly::Baton<> baton;
  143. auto waitingObserver = makeObserver([collectObserver, &baton]() {
  144. *collectObserver;
  145. baton.post();
  146. return folly::Unit();
  147. });
  148. baton.reset();
  149. EXPECT_EQ(0, **collectObserver);
  150. for (size_t i = 1; i <= 3; ++i) {
  151. observable.setValue(i);
  152. EXPECT_TRUE(baton.try_wait_for(std::chrono::seconds{1}));
  153. baton.reset();
  154. EXPECT_EQ(i, **collectObserver);
  155. }
  156. }
  157. TEST(Observer, Stress) {
  158. SimpleObservable<int> observable(0);
  159. auto values = std::make_shared<folly::Synchronized<std::vector<int>>>();
  160. auto observer = makeObserver([child = observable.getObserver(), values]() {
  161. auto value = **child * 10;
  162. values->withWLock([&](std::vector<int>& vals) { vals.push_back(value); });
  163. return value;
  164. });
  165. EXPECT_EQ(0, **observer);
  166. values->withRLock([](const std::vector<int>& vals) {
  167. EXPECT_EQ(1, vals.size());
  168. EXPECT_EQ(0, vals.back());
  169. });
  170. constexpr size_t numIters = 10000;
  171. for (size_t i = 1; i <= numIters; ++i) {
  172. observable.setValue(i);
  173. }
  174. while (**observer != numIters * 10) {
  175. std::this_thread::yield();
  176. }
  177. values->withRLock([numIters = numIters](const std::vector<int>& vals) {
  178. EXPECT_EQ(numIters * 10, vals.back());
  179. EXPECT_LT(vals.size(), numIters / 2);
  180. EXPECT_EQ(0, vals[0]);
  181. EXPECT_EQ(numIters * 10, vals.back());
  182. for (auto value : vals) {
  183. EXPECT_EQ(0, value % 10);
  184. }
  185. for (size_t i = 0; i < vals.size() - 1; ++i) {
  186. EXPECT_LE(vals[i], vals[i + 1]);
  187. }
  188. });
  189. }
  190. TEST(Observer, TLObserver) {
  191. auto createTLObserver = [](int value) {
  192. return folly::observer::makeTLObserver([=] { return value; });
  193. };
  194. auto k =
  195. std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(42));
  196. EXPECT_EQ(42, ***k);
  197. k = std::make_unique<folly::observer::TLObserver<int>>(createTLObserver(41));
  198. EXPECT_EQ(41, ***k);
  199. }
  200. TEST(Observer, SubscribeCallback) {
  201. static auto mainThreadId = std::this_thread::get_id();
  202. static std::function<void()> updatesCob;
  203. static bool slowGet = false;
  204. static std::atomic<size_t> getCallsStart{0};
  205. static std::atomic<size_t> getCallsFinish{0};
  206. struct Observable {
  207. ~Observable() {
  208. EXPECT_EQ(mainThreadId, std::this_thread::get_id());
  209. }
  210. };
  211. struct Traits {
  212. using element_type = int;
  213. static std::shared_ptr<const int> get(Observable&) {
  214. ++getCallsStart;
  215. if (slowGet) {
  216. /* sleep override */ std::this_thread::sleep_for(
  217. std::chrono::seconds{2});
  218. }
  219. ++getCallsFinish;
  220. return std::make_shared<const int>(42);
  221. }
  222. static void subscribe(Observable&, std::function<void()> cob) {
  223. updatesCob = std::move(cob);
  224. }
  225. static void unsubscribe(Observable&) {}
  226. };
  227. std::thread cobThread;
  228. {
  229. auto observer =
  230. folly::observer::ObserverCreator<Observable, Traits>().getObserver();
  231. EXPECT_TRUE(updatesCob);
  232. EXPECT_EQ(2, getCallsStart);
  233. EXPECT_EQ(2, getCallsFinish);
  234. updatesCob();
  235. EXPECT_EQ(3, getCallsStart);
  236. EXPECT_EQ(3, getCallsFinish);
  237. slowGet = true;
  238. cobThread = std::thread([] { updatesCob(); });
  239. /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
  240. EXPECT_EQ(4, getCallsStart);
  241. EXPECT_EQ(3, getCallsFinish);
  242. // Observer is destroyed here
  243. }
  244. // Make sure that destroying the observer actually joined the updates callback
  245. EXPECT_EQ(4, getCallsStart);
  246. EXPECT_EQ(4, getCallsFinish);
  247. cobThread.join();
  248. }
  249. TEST(Observer, SetCallback) {
  250. folly::observer::SimpleObservable<int> observable(42);
  251. auto observer = observable.getObserver();
  252. folly::Baton<> baton;
  253. int callbackValue = 0;
  254. size_t callbackCallsCount = 0;
  255. auto callbackHandle =
  256. observer.addCallback([&](folly::observer::Snapshot<int> snapshot) {
  257. ++callbackCallsCount;
  258. callbackValue = *snapshot;
  259. baton.post();
  260. });
  261. baton.wait();
  262. baton.reset();
  263. EXPECT_EQ(42, callbackValue);
  264. EXPECT_EQ(1, callbackCallsCount);
  265. observable.setValue(43);
  266. baton.wait();
  267. baton.reset();
  268. EXPECT_EQ(43, callbackValue);
  269. EXPECT_EQ(2, callbackCallsCount);
  270. callbackHandle.cancel();
  271. observable.setValue(44);
  272. EXPECT_FALSE(baton.timed_wait(std::chrono::milliseconds{100}));
  273. EXPECT_EQ(43, callbackValue);
  274. EXPECT_EQ(2, callbackCallsCount);
  275. }
  276. int makeObserverRecursion(int n) {
  277. if (n == 0) {
  278. return 0;
  279. }
  280. return **makeObserver([=] { return makeObserverRecursion(n - 1) + 1; });
  281. }
  282. TEST(Observer, NestedMakeObserver) {
  283. EXPECT_EQ(32, makeObserverRecursion(32));
  284. }