DeterministicSchedule.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/test/DeterministicSchedule.h>
  17. #include <assert.h>
  18. #include <algorithm>
  19. #include <list>
  20. #include <mutex>
  21. #include <random>
  22. #include <unordered_map>
  23. #include <utility>
  24. #include <folly/Random.h>
  25. namespace folly {
  26. namespace test {
  27. FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
  28. FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
  29. FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
  30. thread_local AuxAct DeterministicSchedule::tls_aux_act;
  31. AuxChk DeterministicSchedule::aux_chk;
  32. // access is protected by futexLock
  33. static std::unordered_map<
  34. const detail::Futex<DeterministicAtomic>*,
  35. std::list<std::pair<uint32_t, bool*>>>
  36. futexQueues;
  37. static std::mutex futexLock;
  38. DeterministicSchedule::DeterministicSchedule(
  39. const std::function<size_t(size_t)>& scheduler)
  40. : scheduler_(scheduler), nextThreadId_(1), step_(0) {
  41. assert(tls_sem == nullptr);
  42. assert(tls_sched == nullptr);
  43. assert(tls_aux_act == nullptr);
  44. tls_sem = new sem_t;
  45. sem_init(tls_sem, 0, 1);
  46. sems_.push_back(tls_sem);
  47. tls_sched = this;
  48. }
  49. DeterministicSchedule::~DeterministicSchedule() {
  50. assert(tls_sched == this);
  51. assert(sems_.size() == 1);
  52. assert(sems_[0] == tls_sem);
  53. beforeThreadExit();
  54. }
  55. std::function<size_t(size_t)> DeterministicSchedule::uniform(uint64_t seed) {
  56. auto rand = std::make_shared<std::ranlux48>(seed);
  57. return [rand](size_t numActive) {
  58. auto dist = std::uniform_int_distribution<size_t>(0, numActive - 1);
  59. return dist(*rand);
  60. };
  61. }
  62. struct UniformSubset {
  63. UniformSubset(uint64_t seed, size_t subsetSize, size_t stepsBetweenSelect)
  64. : uniform_(DeterministicSchedule::uniform(seed)),
  65. subsetSize_(subsetSize),
  66. stepsBetweenSelect_(stepsBetweenSelect),
  67. stepsLeft_(0) {}
  68. size_t operator()(size_t numActive) {
  69. adjustPermSize(numActive);
  70. if (stepsLeft_-- == 0) {
  71. stepsLeft_ = stepsBetweenSelect_ - 1;
  72. shufflePrefix();
  73. }
  74. return perm_[uniform_(std::min(numActive, subsetSize_))];
  75. }
  76. private:
  77. std::function<size_t(size_t)> uniform_;
  78. const size_t subsetSize_;
  79. const size_t stepsBetweenSelect_;
  80. size_t stepsLeft_;
  81. // only the first subsetSize_ is properly randomized
  82. std::vector<size_t> perm_;
  83. void adjustPermSize(size_t numActive) {
  84. if (perm_.size() > numActive) {
  85. perm_.erase(
  86. std::remove_if(
  87. perm_.begin(),
  88. perm_.end(),
  89. [=](size_t x) { return x >= numActive; }),
  90. perm_.end());
  91. } else {
  92. while (perm_.size() < numActive) {
  93. perm_.push_back(perm_.size());
  94. }
  95. }
  96. assert(perm_.size() == numActive);
  97. }
  98. void shufflePrefix() {
  99. for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
  100. size_t j = uniform_(perm_.size() - i) + i;
  101. std::swap(perm_[i], perm_[j]);
  102. }
  103. }
  104. };
  105. std::function<size_t(size_t)>
  106. DeterministicSchedule::uniformSubset(uint64_t seed, size_t n, size_t m) {
  107. auto gen = std::make_shared<UniformSubset>(seed, n, m);
  108. return [=](size_t numActive) { return (*gen)(numActive); };
  109. }
  110. void DeterministicSchedule::beforeSharedAccess() {
  111. if (tls_sem) {
  112. sem_wait(tls_sem);
  113. }
  114. }
  115. void DeterministicSchedule::afterSharedAccess() {
  116. auto sched = tls_sched;
  117. if (!sched) {
  118. return;
  119. }
  120. sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
  121. }
  122. void DeterministicSchedule::afterSharedAccess(bool success) {
  123. auto sched = tls_sched;
  124. if (!sched) {
  125. return;
  126. }
  127. sched->callAux(success);
  128. sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
  129. }
  130. size_t DeterministicSchedule::getRandNumber(size_t n) {
  131. if (tls_sched) {
  132. return tls_sched->scheduler_(n);
  133. }
  134. return Random::rand32() % n;
  135. }
  136. int DeterministicSchedule::getcpu(
  137. unsigned* cpu,
  138. unsigned* node,
  139. void* /* unused */) {
  140. if (!tls_threadId && tls_sched) {
  141. beforeSharedAccess();
  142. tls_threadId = tls_sched->nextThreadId_++;
  143. afterSharedAccess();
  144. }
  145. if (cpu) {
  146. *cpu = tls_threadId;
  147. }
  148. if (node) {
  149. *node = tls_threadId;
  150. }
  151. return 0;
  152. }
  153. void DeterministicSchedule::setAuxAct(AuxAct& aux) {
  154. tls_aux_act = aux;
  155. }
  156. void DeterministicSchedule::setAuxChk(AuxChk& aux) {
  157. aux_chk = aux;
  158. }
  159. void DeterministicSchedule::clearAuxChk() {
  160. aux_chk = nullptr;
  161. }
  162. void DeterministicSchedule::reschedule(sem_t* sem) {
  163. auto sched = tls_sched;
  164. if (sched) {
  165. sched->sems_.push_back(sem);
  166. }
  167. }
  168. sem_t* DeterministicSchedule::descheduleCurrentThread() {
  169. auto sched = tls_sched;
  170. if (sched) {
  171. sched->sems_.erase(
  172. std::find(sched->sems_.begin(), sched->sems_.end(), tls_sem));
  173. }
  174. return tls_sem;
  175. }
  176. sem_t* DeterministicSchedule::beforeThreadCreate() {
  177. sem_t* s = new sem_t;
  178. sem_init(s, 0, 0);
  179. beforeSharedAccess();
  180. sems_.push_back(s);
  181. afterSharedAccess();
  182. return s;
  183. }
  184. void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
  185. assert(tls_sem == nullptr);
  186. assert(tls_sched == nullptr);
  187. tls_sem = sem;
  188. tls_sched = this;
  189. bool started = false;
  190. while (!started) {
  191. beforeSharedAccess();
  192. if (active_.count(std::this_thread::get_id()) == 1) {
  193. started = true;
  194. }
  195. afterSharedAccess();
  196. }
  197. }
  198. void DeterministicSchedule::beforeThreadExit() {
  199. assert(tls_sched == this);
  200. beforeSharedAccess();
  201. auto parent = joins_.find(std::this_thread::get_id());
  202. if (parent != joins_.end()) {
  203. reschedule(parent->second);
  204. joins_.erase(parent);
  205. }
  206. sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
  207. active_.erase(std::this_thread::get_id());
  208. if (sems_.size() > 0) {
  209. FOLLY_TEST_DSCHED_VLOG("exiting");
  210. afterSharedAccess();
  211. }
  212. sem_destroy(tls_sem);
  213. delete tls_sem;
  214. tls_sem = nullptr;
  215. tls_sched = nullptr;
  216. tls_aux_act = nullptr;
  217. }
  218. void DeterministicSchedule::join(std::thread& child) {
  219. auto sched = tls_sched;
  220. if (sched) {
  221. beforeSharedAccess();
  222. assert(sched->joins_.count(child.get_id()) == 0);
  223. if (sched->active_.count(child.get_id())) {
  224. sem_t* sem = descheduleCurrentThread();
  225. sched->joins_.insert({child.get_id(), sem});
  226. afterSharedAccess();
  227. // Wait to be scheduled by exiting child thread
  228. beforeSharedAccess();
  229. assert(!sched->active_.count(child.get_id()));
  230. }
  231. afterSharedAccess();
  232. }
  233. FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
  234. child.join();
  235. }
  236. void DeterministicSchedule::callAux(bool success) {
  237. ++step_;
  238. if (tls_aux_act) {
  239. tls_aux_act(success);
  240. tls_aux_act = nullptr;
  241. }
  242. if (aux_chk) {
  243. aux_chk(step_);
  244. }
  245. }
  246. void DeterministicSchedule::post(sem_t* sem) {
  247. beforeSharedAccess();
  248. sem_post(sem);
  249. FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
  250. afterSharedAccess();
  251. }
  252. bool DeterministicSchedule::tryWait(sem_t* sem) {
  253. beforeSharedAccess();
  254. int rv = sem_trywait(sem);
  255. int e = rv == 0 ? 0 : errno;
  256. FOLLY_TEST_DSCHED_VLOG(
  257. "sem_trywait(" << sem << ") = " << rv << " errno=" << e);
  258. afterSharedAccess();
  259. if (rv == 0) {
  260. return true;
  261. } else {
  262. assert(e == EAGAIN);
  263. return false;
  264. }
  265. }
  266. void DeterministicSchedule::wait(sem_t* sem) {
  267. while (!tryWait(sem)) {
  268. // we're not busy waiting because this is a deterministic schedule
  269. }
  270. }
  271. detail::FutexResult futexWaitImpl(
  272. const detail::Futex<DeterministicAtomic>* futex,
  273. uint32_t expected,
  274. std::chrono::system_clock::time_point const* absSystemTimeout,
  275. std::chrono::steady_clock::time_point const* absSteadyTimeout,
  276. uint32_t waitMask) {
  277. using namespace test;
  278. using namespace std::chrono;
  279. using namespace folly::detail;
  280. bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
  281. bool awoken = false;
  282. FutexResult result = FutexResult::AWOKEN;
  283. DeterministicSchedule::beforeSharedAccess();
  284. FOLLY_TEST_DSCHED_VLOG(
  285. "futexWait(" << futex << ", " << std::hex << expected << ", .., "
  286. << std::hex << waitMask << ") beginning..");
  287. futexLock.lock();
  288. if (futex->load_direct() == expected) {
  289. auto& queue = futexQueues[futex];
  290. queue.emplace_back(waitMask, &awoken);
  291. auto ours = queue.end();
  292. ours--;
  293. while (!awoken) {
  294. futexLock.unlock();
  295. DeterministicSchedule::afterSharedAccess();
  296. DeterministicSchedule::beforeSharedAccess();
  297. futexLock.lock();
  298. // Simulate spurious wake-ups, timeouts each time with
  299. // a 10% probability if we haven't been woken up already
  300. if (!awoken && hasTimeout &&
  301. DeterministicSchedule::getRandNumber(100) < 10) {
  302. assert(futexQueues.count(futex) != 0 && &futexQueues[futex] == &queue);
  303. queue.erase(ours);
  304. if (queue.empty()) {
  305. futexQueues.erase(futex);
  306. }
  307. // Simulate ETIMEDOUT 90% of the time and other failures
  308. // remaining time
  309. result = DeterministicSchedule::getRandNumber(100) >= 10
  310. ? FutexResult::TIMEDOUT
  311. : FutexResult::INTERRUPTED;
  312. break;
  313. }
  314. }
  315. } else {
  316. result = FutexResult::VALUE_CHANGED;
  317. }
  318. futexLock.unlock();
  319. char const* resultStr = "?";
  320. switch (result) {
  321. case FutexResult::AWOKEN:
  322. resultStr = "AWOKEN";
  323. break;
  324. case FutexResult::TIMEDOUT:
  325. resultStr = "TIMEDOUT";
  326. break;
  327. case FutexResult::INTERRUPTED:
  328. resultStr = "INTERRUPTED";
  329. break;
  330. case FutexResult::VALUE_CHANGED:
  331. resultStr = "VALUE_CHANGED";
  332. break;
  333. }
  334. FOLLY_TEST_DSCHED_VLOG(
  335. "futexWait(" << futex << ", " << std::hex << expected << ", .., "
  336. << std::hex << waitMask << ") -> " << resultStr);
  337. DeterministicSchedule::afterSharedAccess();
  338. return result;
  339. }
  340. int futexWakeImpl(
  341. const detail::Futex<test::DeterministicAtomic>* futex,
  342. int count,
  343. uint32_t wakeMask) {
  344. using namespace test;
  345. using namespace std::chrono;
  346. int rv = 0;
  347. DeterministicSchedule::beforeSharedAccess();
  348. futexLock.lock();
  349. if (futexQueues.count(futex) > 0) {
  350. auto& queue = futexQueues[futex];
  351. auto iter = queue.begin();
  352. while (iter != queue.end() && rv < count) {
  353. auto cur = iter++;
  354. if ((cur->first & wakeMask) != 0) {
  355. *(cur->second) = true;
  356. rv++;
  357. queue.erase(cur);
  358. }
  359. }
  360. if (queue.empty()) {
  361. futexQueues.erase(futex);
  362. }
  363. }
  364. futexLock.unlock();
  365. FOLLY_TEST_DSCHED_VLOG(
  366. "futexWake(" << futex << ", " << count << ", " << std::hex << wakeMask
  367. << ") -> " << rv);
  368. DeterministicSchedule::afterSharedAccess();
  369. return rv;
  370. }
  371. } // namespace test
  372. } // namespace folly
  373. namespace folly {
  374. template <>
  375. CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
  376. static CacheLocality cache(CacheLocality::uniform(16));
  377. return cache;
  378. }
  379. template <>
  380. Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
  381. return &test::DeterministicSchedule::getcpu;
  382. }
  383. } // namespace folly