DistributedMutexTest.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. /*
  2. * Copyright 2018-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/synchronization/DistributedMutex.h>
  17. #include <folly/MapUtil.h>
  18. #include <folly/Synchronized.h>
  19. #include <folly/container/Foreach.h>
  20. #include <folly/portability/GTest.h>
  21. #include <folly/synchronization/Baton.h>
  22. #include <folly/test/DeterministicSchedule.h>
  23. #include <chrono>
  24. #include <thread>
  25. using namespace std::literals;
  26. namespace folly {
  27. namespace test {
  28. /**
  29. * Like DeterministicSchedule, but allows setting callbacks that can be run
  30. * for the current thread when an atomic access occurs, and after. This
  31. * allows us to construct thread interleavings by hand
  32. *
  33. * Constructing a ManualSchedule is required to ensure that we maintain
  34. * per-test state for threads
  35. *
  36. * This can also be used to order thread movement, as an alternative to
  37. * maintaining condition variables and/or semaphores for the purposes of
  38. * testing, for example
  39. *
  40. * auto one = std::thread{[&]() {
  41. * schedule.wait(1);
  42. * two();
  43. * schedule.post(2);
  44. * }};
  45. *
  46. * auto two = std::thread{[&]() {
  47. * one();
  48. * schedule.post(1);
  49. * schedule.wait(2);
  50. * three();
  51. * }};
  52. *
  53. * The code above is guaranteed to call one(), then two(), and then three()
  54. */
  55. class ManualSchedule {
  56. public:
  57. ManualSchedule() = default;
  58. ~ManualSchedule() {
  59. // delete this schedule from the global map
  60. auto schedules = schedules_.wlock();
  61. for_each(*schedules, [&](auto& schedule, auto, auto iter) {
  62. if (schedule.second == this) {
  63. schedules->erase(iter);
  64. }
  65. });
  66. }
  67. /**
  68. * These will be invoked by DeterministicAtomic to signal atomic access
  69. * before and after the operation
  70. */
  71. static void beforeSharedAccess() {
  72. if (folly::kIsDebug) {
  73. auto id = std::this_thread::get_id();
  74. // get the schedule assigned for the current thread, if one exists,
  75. // otherwise proceed as normal
  76. auto schedule = get_ptr(*schedules_.wlock(), id);
  77. if (!schedule) {
  78. return;
  79. }
  80. // now try and get the callbacks for this thread, if there is a callback
  81. // registered for the test, it must mean that we have a callback
  82. auto callback = get_ptr((*(*schedule)->callbacks_.wlock()), id);
  83. if (!callback) {
  84. return;
  85. }
  86. (*callback)();
  87. }
  88. }
  89. static void afterSharedAccess(bool) {
  90. beforeSharedAccess();
  91. }
  92. /**
  93. * Set a callback that will be called on every subsequent atomic access.
  94. * This will be invoked before and after every atomic access, for the thread
  95. * that called setCallback
  96. */
  97. void setCallback(std::function<void()> callback) {
  98. schedules_.wlock()->insert({std::this_thread::get_id(), this});
  99. callbacks_.wlock()->insert({std::this_thread::get_id(), callback});
  100. }
  101. /**
  102. * Delete the callback set for this thread on atomic accesses
  103. */
  104. void removeCallbacks() {
  105. callbacks_.wlock()->erase(std::this_thread::get_id());
  106. }
  107. /**
  108. * wait() and post() for easy testing
  109. */
  110. void wait(int id) {
  111. if (folly::kIsDebug) {
  112. auto& baton = (*batons_.wlock())[id];
  113. baton.wait();
  114. }
  115. }
  116. void post(int id) {
  117. if (folly::kIsDebug) {
  118. auto& baton = (*batons_.wlock())[id];
  119. baton.post();
  120. }
  121. }
  122. private:
  123. // the map of threads to the schedule started for that test
  124. static Synchronized<std::unordered_map<std::thread::id, ManualSchedule*>>
  125. schedules_;
  126. // the map of callbacks to be executed for a thread's atomic accesses
  127. Synchronized<std::unordered_map<std::thread::id, std::function<void()>>>
  128. callbacks_;
  129. // batons for testing, this map will only ever be written to, so it is safe
  130. // to hold references outside lock
  131. Synchronized<std::unordered_map<int, folly::Baton<>>> batons_;
  132. };
  133. Synchronized<std::unordered_map<std::thread::id, ManualSchedule*>>
  134. ManualSchedule::schedules_;
  135. template <typename T>
  136. using ManualAtomic = test::DeterministicAtomicImpl<T, ManualSchedule>;
  137. template <template <typename> class Atomic>
  138. using TestDistributedMutex =
  139. detail::distributed_mutex::DistributedMutex<Atomic, false>;
  140. /**
  141. * Futex extensions for ManualAtomic
  142. *
  143. * Note that doing nothing in these should still result in a program that is
  144. * well defined, since futex wait calls should be tolerant to spurious wakeups
  145. */
  146. int futexWakeImpl(const detail::Futex<ManualAtomic>*, int, uint32_t) {
  147. ManualSchedule::beforeSharedAccess();
  148. return 1;
  149. }
  150. detail::FutexResult futexWaitImpl(
  151. const detail::Futex<ManualAtomic>*,
  152. uint32_t,
  153. std::chrono::system_clock::time_point const*,
  154. std::chrono::steady_clock::time_point const*,
  155. uint32_t) {
  156. ManualSchedule::beforeSharedAccess();
  157. return detail::FutexResult::AWOKEN;
  158. }
  159. template <typename Clock, typename Duration>
  160. std::cv_status atomic_wait_until(
  161. const ManualAtomic<std::uintptr_t>*,
  162. std::uintptr_t,
  163. const std::chrono::time_point<Clock, Duration>&) {
  164. ManualSchedule::beforeSharedAccess();
  165. return std::cv_status::no_timeout;
  166. }
  167. void atomic_notify_one(const ManualAtomic<std::uintptr_t>*) {
  168. ManualSchedule::beforeSharedAccess();
  169. }
  170. } // namespace test
  171. namespace {
  172. DEFINE_int32(stress_factor, 1000, "The stress test factor for tests");
  173. constexpr auto kForever = 100h;
  174. using DSched = test::DeterministicSchedule;
  175. int sum(int n) {
  176. return (n * (n + 1)) / 2;
  177. }
  178. template <template <typename> class Atom = std::atomic>
  179. void basicNThreads(int numThreads, int iterations = FLAGS_stress_factor) {
  180. auto&& mutex = detail::distributed_mutex::DistributedMutex<Atom>{};
  181. auto&& barrier = std::atomic<int>{0};
  182. auto&& threads = std::vector<std::thread>{};
  183. auto&& result = std::vector<int>{};
  184. auto&& function = [&](auto id) {
  185. return [&, id] {
  186. for (auto j = 0; j < iterations; ++j) {
  187. auto state = mutex.lock();
  188. EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
  189. result.push_back(id);
  190. EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
  191. mutex.unlock(std::move(state));
  192. }
  193. };
  194. };
  195. for (auto i = 1; i <= numThreads; ++i) {
  196. threads.push_back(DSched::thread(function(i)));
  197. }
  198. for (auto& thread : threads) {
  199. DSched::join(thread);
  200. }
  201. auto total = 0;
  202. for (auto value : result) {
  203. total += value;
  204. }
  205. EXPECT_EQ(total, sum(numThreads) * iterations);
  206. }
  207. } // namespace
  208. TEST(DistributedMutex, InternalDetailTestOne) {
  209. auto value = 0;
  210. auto ptr = reinterpret_cast<std::uintptr_t>(&value);
  211. EXPECT_EQ(detail::distributed_mutex::extractAddress<int>(ptr), &value);
  212. ptr = ptr | 0b1;
  213. EXPECT_EQ(detail::distributed_mutex::extractAddress<int>(ptr), &value);
  214. }
  215. TEST(DistributedMutex, Basic) {
  216. auto&& mutex = DistributedMutex{};
  217. auto state = mutex.lock();
  218. mutex.unlock(std::move(state));
  219. }
  220. TEST(DistributedMutex, BasicTryLock) {
  221. auto&& mutex = DistributedMutex{};
  222. while (true) {
  223. auto state = mutex.try_lock();
  224. if (state) {
  225. mutex.unlock(std::move(state));
  226. break;
  227. }
  228. }
  229. }
  230. TEST(DistributedMutex, TestSingleElementContentionChain) {
  231. using namespace folly::detail;
  232. // Acquire the mutex once, let another thread form a contention chain on the
  233. // mutex, and then release it. Observe the other thread grab the lock
  234. auto&& schedule = test::ManualSchedule{};
  235. auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
  236. auto&& waiter = std::thread{[&]() {
  237. schedule.setCallback([&, i = 0]() mutable {
  238. if (i == 2) {
  239. schedule.post(1);
  240. }
  241. ++i;
  242. });
  243. schedule.wait(0);
  244. auto state = mutex.lock();
  245. mutex.unlock(std::move(state));
  246. }};
  247. // lock the mutex, signal the waiter, and then wait till the first thread
  248. // has gotten on the wait list
  249. auto state = mutex.lock();
  250. schedule.post(0);
  251. schedule.wait(1);
  252. // release the mutex, and then wait for the waiter to acquire the lock
  253. mutex.unlock(std::move(state));
  254. waiter.join();
  255. }
  256. TEST(DistributedMutex, TestTwoElementContentionChain) {
  257. using namespace folly::detail;
  258. // Acquire the mutex once, let another thread form a contention chain on the
  259. // mutex, and then release it. Observe the other thread grab the lock
  260. auto&& schedule = test::ManualSchedule{};
  261. auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
  262. auto&& one = std::thread{[&]() {
  263. schedule.setCallback([&, i = 0]() mutable {
  264. if (i == 2) {
  265. schedule.post(3);
  266. }
  267. ++i;
  268. });
  269. schedule.wait(0);
  270. auto state = mutex.lock();
  271. mutex.unlock(std::move(state));
  272. }};
  273. auto&& two = std::thread{[&]() {
  274. schedule.setCallback([&, i = 0]() mutable {
  275. if (i == 2) {
  276. schedule.post(2);
  277. }
  278. ++i;
  279. });
  280. schedule.wait(1);
  281. auto state = mutex.lock();
  282. mutex.unlock(std::move(state));
  283. }};
  284. // lock the mutex, signal the waiter, and then wait till the first thread
  285. // has gotten on the wait list
  286. auto state = mutex.lock();
  287. schedule.post(0);
  288. schedule.post(1);
  289. schedule.wait(2);
  290. schedule.wait(3);
  291. // release the mutex, and then wait for the waiter to acquire the lock
  292. mutex.unlock(std::move(state));
  293. one.join();
  294. two.join();
  295. }
  296. TEST(DistributedMutex, TestTwoContentionChains) {
  297. using namespace folly::detail;
  298. auto&& schedule = test::ManualSchedule{};
  299. auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
  300. auto&& one = std::thread{[&]() {
  301. schedule.setCallback([&, i = 0]() mutable {
  302. if (i == 2) {
  303. schedule.post(0);
  304. }
  305. ++i;
  306. });
  307. schedule.wait(1);
  308. auto state = mutex.lock();
  309. schedule.wait(4);
  310. mutex.unlock(std::move(state));
  311. }};
  312. auto&& two = std::thread{[&]() {
  313. schedule.setCallback([&, i = 0]() mutable {
  314. if (i == 2) {
  315. schedule.post(2);
  316. }
  317. ++i;
  318. });
  319. schedule.wait(3);
  320. auto state = mutex.lock();
  321. schedule.wait(5);
  322. mutex.unlock(std::move(state));
  323. }};
  324. auto state = mutex.lock();
  325. schedule.post(1);
  326. schedule.post(3);
  327. schedule.wait(0);
  328. schedule.wait(2);
  329. // at this point there is one contention chain. Release it
  330. mutex.unlock(std::move(state));
  331. // then start a new contention chain
  332. auto&& three = std::thread{[&]() {
  333. schedule.setCallback([&, i = 0]() mutable {
  334. if (i == 2) {
  335. schedule.post(4);
  336. schedule.post(5);
  337. }
  338. ++i;
  339. });
  340. auto lockState = mutex.lock();
  341. schedule.post(6);
  342. mutex.unlock(std::move(lockState));
  343. }};
  344. // wait for the third thread to pick up the lock
  345. schedule.wait(6);
  346. one.join();
  347. two.join();
  348. three.join();
  349. }
  350. TEST(DistributedMutex, StressTwoThreads) {
  351. basicNThreads(2);
  352. }
  353. TEST(DistributedMutex, StressThreeThreads) {
  354. basicNThreads(3);
  355. }
  356. TEST(DistributedMutex, StressFourThreads) {
  357. basicNThreads(4);
  358. }
  359. TEST(DistributedMutex, StressFiveThreads) {
  360. basicNThreads(5);
  361. }
  362. TEST(DistributedMutex, StressSixThreads) {
  363. basicNThreads(6);
  364. }
  365. TEST(DistributedMutex, StressSevenThreads) {
  366. basicNThreads(7);
  367. }
  368. TEST(DistributedMutex, StressEightThreads) {
  369. basicNThreads(8);
  370. }
  371. TEST(DistributedMutex, StressSixteenThreads) {
  372. basicNThreads(16);
  373. }
  374. TEST(DistributedMutex, StressThirtyTwoThreads) {
  375. basicNThreads(32);
  376. }
  377. TEST(DistributedMutex, StressSixtyFourThreads) {
  378. basicNThreads(64);
  379. }
  380. TEST(DistributedMutex, StressHundredThreads) {
  381. basicNThreads(100);
  382. }
  383. TEST(DistributedMutex, StressHardwareConcurrencyThreads) {
  384. basicNThreads(std::thread::hardware_concurrency());
  385. }
  386. TEST(DistributedMutex, StressTryLock) {
  387. auto&& mutex = DistributedMutex{};
  388. for (auto i = 0; i < FLAGS_stress_factor; ++i) {
  389. while (true) {
  390. auto state = mutex.try_lock();
  391. if (state) {
  392. mutex.unlock(std::move(state));
  393. break;
  394. }
  395. }
  396. }
  397. }
  398. namespace {
  399. constexpr auto numIterationsDeterministicTest(int threads) {
  400. if (threads <= 8) {
  401. return 100;
  402. }
  403. return 10;
  404. }
  405. void runBasicNThreadsDeterministic(int threads, int iterations) {
  406. for (auto pass = 0; pass < 3; ++pass) {
  407. auto&& schedule = DSched{DSched::uniform(pass)};
  408. basicNThreads<test::DeterministicAtomic>(threads, iterations);
  409. static_cast<void>(schedule);
  410. }
  411. }
  412. } // namespace
  413. TEST(DistributedMutex, DeterministicStressTwoThreads) {
  414. runBasicNThreadsDeterministic(2, numIterationsDeterministicTest(2));
  415. }
  416. TEST(DistributedMutex, DeterministicStressFourThreads) {
  417. runBasicNThreadsDeterministic(4, numIterationsDeterministicTest(4));
  418. }
  419. TEST(DistributedMutex, DeterministicStressEightThreads) {
  420. runBasicNThreadsDeterministic(8, numIterationsDeterministicTest(8));
  421. }
  422. TEST(DistributedMutex, DeterministicStressSixteenThreads) {
  423. runBasicNThreadsDeterministic(16, numIterationsDeterministicTest(16));
  424. }
  425. TEST(DistributedMutex, DeterministicStressThirtyTwoThreads) {
  426. runBasicNThreadsDeterministic(32, numIterationsDeterministicTest(32));
  427. }
  428. TEST(DistributedMutex, TimedLockTimeout) {
  429. auto&& mutex = DistributedMutex{};
  430. auto&& start = folly::Baton<>{};
  431. auto&& done = folly::Baton<>{};
  432. auto thread = std::thread{[&]() {
  433. auto state = mutex.lock();
  434. start.post();
  435. done.wait();
  436. mutex.unlock(std::move(state));
  437. }};
  438. start.wait();
  439. auto result = mutex.try_lock_for(10ms);
  440. EXPECT_FALSE(result);
  441. done.post();
  442. thread.join();
  443. }
  444. TEST(DistributedMutex, TimedLockAcquireAfterUnlock) {
  445. auto&& mutex = DistributedMutex{};
  446. auto&& start = folly::Baton<>{};
  447. auto thread = std::thread{[&]() {
  448. auto state = mutex.lock();
  449. start.post();
  450. /* sleep override */
  451. std::this_thread::sleep_for(10ms);
  452. mutex.unlock(std::move(state));
  453. }};
  454. start.wait();
  455. auto result = mutex.try_lock_for(kForever);
  456. EXPECT_TRUE(result);
  457. thread.join();
  458. }
  459. TEST(DistributedMutex, TimedLockAcquireAfterLock) {
  460. auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
  461. auto&& schedule = test::ManualSchedule{};
  462. auto thread = std::thread{[&] {
  463. schedule.setCallback([&, i = 0]() mutable {
  464. if (i == 1) {
  465. schedule.post(0);
  466. schedule.wait(1);
  467. }
  468. // when this thread goes into the atomic_notify_one() we let the other
  469. // thread wake up
  470. if (i == 3) {
  471. schedule.post(2);
  472. }
  473. ++i;
  474. });
  475. auto state = mutex.lock();
  476. mutex.unlock(std::move(state));
  477. }};
  478. schedule.setCallback([&, i = 0]() mutable {
  479. // allow the other thread to unlock after the current thread has set the
  480. // timed waiter state into the mutex
  481. if (i == 2) {
  482. schedule.post(1);
  483. schedule.wait(2);
  484. }
  485. ++i;
  486. });
  487. schedule.wait(0);
  488. auto state = mutex.try_lock_for(kForever);
  489. EXPECT_TRUE(state);
  490. mutex.unlock(std::move(state));
  491. thread.join();
  492. }
  493. TEST(DistributedMutex, TimedLockAcquireAfterContentionChain) {
  494. auto&& mutex = test::TestDistributedMutex<test::ManualAtomic>{};
  495. auto&& schedule = test::ManualSchedule{};
  496. auto one = std::thread{[&] {
  497. schedule.setCallback([&, i = 0]() mutable {
  498. if (i == 1) {
  499. schedule.post(0);
  500. schedule.wait(1);
  501. schedule.wait(2);
  502. }
  503. ++i;
  504. });
  505. auto state = mutex.lock();
  506. mutex.unlock(std::move(state));
  507. }};
  508. auto two = std::thread{[&] {
  509. schedule.setCallback([&, i = 0]() mutable {
  510. // block the current thread until the first thread has acquired the
  511. // lock
  512. if (i == 0) {
  513. schedule.wait(0);
  514. }
  515. // when the current thread enqueues, let the first thread unlock so we
  516. // get woken up
  517. //
  518. // then wait for the first thread to unlock
  519. if (i == 2) {
  520. schedule.post(1);
  521. }
  522. ++i;
  523. });
  524. auto state = mutex.lock();
  525. mutex.unlock(std::move(state));
  526. }};
  527. // make the current thread wait for the first thread to unlock
  528. schedule.setCallback([&, i = 0]() mutable {
  529. // let the first thread unlock after we have enqueued ourselves on the
  530. // mutex
  531. if (i == 2) {
  532. schedule.post(2);
  533. }
  534. ++i;
  535. });
  536. auto state = mutex.try_lock_for(kForever);
  537. EXPECT_TRUE(state);
  538. mutex.unlock(std::move(state));
  539. one.join();
  540. two.join();
  541. }
  542. namespace {
  543. template <template <typename> class Atom = std::atomic>
  544. void stressTryLockWithConcurrentLocks(
  545. int numThreads,
  546. int iterations = FLAGS_stress_factor) {
  547. auto&& threads = std::vector<std::thread>{};
  548. auto&& mutex = detail::distributed_mutex::DistributedMutex<Atom>{};
  549. auto&& atomic = std::atomic<std::uint64_t>{0};
  550. for (auto i = 0; i < numThreads; ++i) {
  551. threads.push_back(DSched::thread([&] {
  552. for (auto j = 0; j < iterations; ++j) {
  553. auto state = mutex.lock();
  554. EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
  555. EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
  556. mutex.unlock(std::move(state));
  557. }
  558. }));
  559. }
  560. for (auto i = 0; i < iterations; ++i) {
  561. if (auto state = mutex.try_lock()) {
  562. EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
  563. EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
  564. mutex.unlock(std::move(state));
  565. }
  566. }
  567. for (auto& thread : threads) {
  568. DSched::join(thread);
  569. }
  570. }
  571. } // namespace
  572. TEST(DistributedMutex, StressTryLockWithConcurrentLocksTwoThreads) {
  573. stressTryLockWithConcurrentLocks(2);
  574. }
  575. TEST(DistributedMutex, StressTryLockWithConcurrentLocksFourThreads) {
  576. stressTryLockWithConcurrentLocks(4);
  577. }
  578. TEST(DistributedMutex, StressTryLockWithConcurrentLocksEightThreads) {
  579. stressTryLockWithConcurrentLocks(8);
  580. }
  581. TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixteenThreads) {
  582. stressTryLockWithConcurrentLocks(16);
  583. }
  584. TEST(DistributedMutex, StressTryLockWithConcurrentLocksThirtyTwoThreads) {
  585. stressTryLockWithConcurrentLocks(32);
  586. }
  587. TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixtyFourThreads) {
  588. stressTryLockWithConcurrentLocks(64);
  589. }
  590. TEST(DistributedMutex, DeterministicTryLockWithLocksTwoThreads) {
  591. auto iterations = numIterationsDeterministicTest(2);
  592. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(2, iterations);
  593. for (auto pass = 0; pass < 3; ++pass) {
  594. auto&& schedule = DSched{DSched::uniform(pass)};
  595. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(2, iterations);
  596. static_cast<void>(schedule);
  597. }
  598. }
  599. TEST(DistributedMutex, DeterministicTryLockWithFourThreads) {
  600. auto iterations = numIterationsDeterministicTest(4);
  601. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(4, iterations);
  602. for (auto pass = 0; pass < 3; ++pass) {
  603. auto&& schedule = DSched{DSched::uniform(pass)};
  604. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(4, iterations);
  605. static_cast<void>(schedule);
  606. }
  607. }
  608. TEST(DistributedMutex, DeterministicTryLockWithLocksEightThreads) {
  609. auto iterations = numIterationsDeterministicTest(8);
  610. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(8, iterations);
  611. for (auto pass = 0; pass < 3; ++pass) {
  612. auto&& schedule = DSched{DSched::uniform(pass)};
  613. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(8, iterations);
  614. static_cast<void>(schedule);
  615. }
  616. }
  617. TEST(DistributedMutex, DeterministicTryLockWithLocksSixteenThreads) {
  618. auto iterations = numIterationsDeterministicTest(16);
  619. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(16, iterations);
  620. for (auto pass = 0; pass < 3; ++pass) {
  621. auto&& schedule = DSched{DSched::uniform(pass)};
  622. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(16, iterations);
  623. static_cast<void>(schedule);
  624. }
  625. }
  626. TEST(DistributedMutex, DeterministicTryLockWithLocksThirtyTwoThreads) {
  627. auto iterations = numIterationsDeterministicTest(32);
  628. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(32, iterations);
  629. for (auto pass = 0; pass < 3; ++pass) {
  630. auto&& schedule = DSched{DSched::uniform(pass)};
  631. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(32, iterations);
  632. static_cast<void>(schedule);
  633. }
  634. }
  635. TEST(DistributedMutex, DeterministicTryLockWithLocksSixtyFourThreads) {
  636. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(64, 5);
  637. for (auto pass = 0; pass < 3; ++pass) {
  638. auto&& schedule = DSched{DSched::uniform(pass)};
  639. stressTryLockWithConcurrentLocks<test::DeterministicAtomic>(64, 5);
  640. static_cast<void>(schedule);
  641. }
  642. }
  643. namespace {
  644. template <template <typename> class Atom = std::atomic>
  645. void concurrentTryLocks(int numThreads, int iterations = FLAGS_stress_factor) {
  646. auto&& threads = std::vector<std::thread>{};
  647. auto&& mutex = detail::distributed_mutex::DistributedMutex<Atom>{};
  648. auto&& atomic = std::atomic<std::uint64_t>{0};
  649. for (auto i = 0; i < numThreads; ++i) {
  650. threads.push_back(DSched::thread([&] {
  651. for (auto j = 0; j < iterations; ++j) {
  652. if (auto state = mutex.try_lock()) {
  653. EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
  654. EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
  655. mutex.unlock(std::move(state));
  656. }
  657. }
  658. }));
  659. }
  660. for (auto& thread : threads) {
  661. DSched::join(thread);
  662. }
  663. }
  664. } // namespace
  665. TEST(DistributedMutex, StressTryLockWithTwoThreads) {
  666. concurrentTryLocks(2);
  667. }
  668. TEST(DistributedMutex, StressTryLockFourThreads) {
  669. concurrentTryLocks(4);
  670. }
  671. TEST(DistributedMutex, StressTryLockEightThreads) {
  672. concurrentTryLocks(8);
  673. }
  674. TEST(DistributedMutex, StressTryLockSixteenThreads) {
  675. concurrentTryLocks(16);
  676. }
  677. TEST(DistributedMutex, StressTryLockThirtyTwoThreads) {
  678. concurrentTryLocks(32);
  679. }
  680. TEST(DistributedMutex, StressTryLockSixtyFourThreads) {
  681. concurrentTryLocks(64);
  682. }
  683. TEST(DistributedMutex, DeterministicTryLockTwoThreads) {
  684. auto iterations = numIterationsDeterministicTest(2);
  685. concurrentTryLocks<test::DeterministicAtomic>(2, iterations);
  686. for (auto pass = 0; pass < 3; ++pass) {
  687. auto&& schedule = DSched{DSched::uniform(pass)};
  688. concurrentTryLocks<test::DeterministicAtomic>(2, iterations);
  689. static_cast<void>(schedule);
  690. }
  691. }
  692. TEST(DistributedMutex, DeterministicTryLockFourThreads) {
  693. auto iterations = numIterationsDeterministicTest(4);
  694. concurrentTryLocks<test::DeterministicAtomic>(4, iterations);
  695. for (auto pass = 0; pass < 3; ++pass) {
  696. auto&& schedule = DSched{DSched::uniform(pass)};
  697. concurrentTryLocks<test::DeterministicAtomic>(4, iterations);
  698. static_cast<void>(schedule);
  699. }
  700. }
  701. TEST(DistributedMutex, DeterministicTryLockEightThreads) {
  702. auto iterations = numIterationsDeterministicTest(8);
  703. concurrentTryLocks<test::DeterministicAtomic>(8, iterations);
  704. for (auto pass = 0; pass < 3; ++pass) {
  705. auto&& schedule = DSched{DSched::uniform(pass)};
  706. concurrentTryLocks<test::DeterministicAtomic>(8, iterations);
  707. static_cast<void>(schedule);
  708. }
  709. }
  710. TEST(DistributedMutex, DeterministicTryLockSixteenThreads) {
  711. auto iterations = numIterationsDeterministicTest(16);
  712. concurrentTryLocks<test::DeterministicAtomic>(16, iterations);
  713. for (auto pass = 0; pass < 3; ++pass) {
  714. auto&& schedule = DSched{DSched::uniform(pass)};
  715. concurrentTryLocks<test::DeterministicAtomic>(16, iterations);
  716. static_cast<void>(schedule);
  717. }
  718. }
  719. TEST(DistributedMutex, DeterministicTryLockThirtyTwoThreads) {
  720. auto iterations = numIterationsDeterministicTest(32);
  721. concurrentTryLocks<test::DeterministicAtomic>(32, iterations);
  722. for (auto pass = 0; pass < 3; ++pass) {
  723. auto&& schedule = DSched{DSched::uniform(pass)};
  724. concurrentTryLocks<test::DeterministicAtomic>(32, iterations);
  725. static_cast<void>(schedule);
  726. }
  727. }
  728. TEST(DistributedMutex, DeterministicTryLockSixtyFourThreads) {
  729. concurrentTryLocks<test::DeterministicAtomic>(64, 5);
  730. for (auto pass = 0; pass < 3; ++pass) {
  731. auto&& schedule = DSched{DSched::uniform(pass)};
  732. concurrentTryLocks<test::DeterministicAtomic>(64, 5);
  733. static_cast<void>(schedule);
  734. }
  735. }
  736. } // namespace folly