AtomicHashMapTest.cpp 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130
  1. /*
  2. * Copyright 2012-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/AtomicHashMap.h>
  17. #include <atomic>
  18. #include <memory>
  19. #include <thread>
  20. #include <glog/logging.h>
  21. #include <folly/Benchmark.h>
  22. #include <folly/Conv.h>
  23. #include <folly/portability/GTest.h>
  24. #include <folly/portability/SysTime.h>
  25. using folly::AtomicHashArray;
  26. using folly::AtomicHashMap;
  27. using folly::StringPiece;
  28. using std::string;
  29. using std::vector;
  30. // Tunables:
  31. DEFINE_double(targetLoadFactor, 0.75, "Target memory utilization fraction.");
  32. DEFINE_double(maxLoadFactor, 0.80, "Max before growth.");
  33. DEFINE_int32(numThreads, 8, "Threads to use for concurrency tests.");
  34. DEFINE_int64(numBMElements, 12 * 1000 * 1000, "Size of maps for benchmarks.");
  35. const double LF = FLAGS_maxLoadFactor / FLAGS_targetLoadFactor;
  36. const int maxBMElements = int(FLAGS_numBMElements * LF); // hit our target LF.
  37. static int64_t nowInUsec() {
  38. timeval tv;
  39. gettimeofday(&tv, nullptr);
  40. return int64_t(tv.tv_sec) * 1000 * 1000 + tv.tv_usec;
  41. }
  42. TEST(Ahm, BasicStrings) {
  43. typedef AtomicHashMap<int64_t, string> AHM;
  44. AHM myMap(1024);
  45. EXPECT_TRUE(myMap.begin() == myMap.end());
  46. for (int i = 0; i < 100; ++i) {
  47. myMap.insert(make_pair(i, folly::to<string>(i)));
  48. }
  49. for (int i = 0; i < 100; ++i) {
  50. EXPECT_EQ(myMap.find(i)->second, folly::to<string>(i));
  51. }
  52. myMap.insert(std::make_pair(999, "A"));
  53. myMap.insert(std::make_pair(999, "B"));
  54. EXPECT_EQ(myMap.find(999)->second, "A"); // shouldn't have overwritten
  55. myMap.find(999)->second = "B";
  56. myMap.find(999)->second = "C";
  57. EXPECT_EQ(myMap.find(999)->second, "C");
  58. EXPECT_EQ(myMap.find(999)->first, 999);
  59. }
  60. TEST(Ahm, BasicNoncopyable) {
  61. typedef AtomicHashMap<int64_t, std::unique_ptr<int>> AHM;
  62. AHM myMap(1024);
  63. EXPECT_TRUE(myMap.begin() == myMap.end());
  64. for (int i = 0; i < 50; ++i) {
  65. myMap.insert(make_pair(i, std::make_unique<int>(i)));
  66. }
  67. for (int i = 50; i < 100; ++i) {
  68. myMap.insert(i, std::make_unique<int>(i));
  69. }
  70. for (int i = 100; i < 150; ++i) {
  71. myMap.emplace(i, new int(i));
  72. }
  73. for (int i = 150; i < 200; ++i) {
  74. myMap.emplace(i, new int(i), std::default_delete<int>());
  75. }
  76. for (int i = 0; i < 200; ++i) {
  77. EXPECT_EQ(*(myMap.find(i)->second), i);
  78. }
  79. for (int i = 0; i < 200; i += 4) {
  80. myMap.erase(i);
  81. }
  82. for (int i = 0; i < 200; i += 4) {
  83. EXPECT_EQ(myMap.find(i), myMap.end());
  84. }
  85. }
  86. typedef int32_t KeyT;
  87. typedef int32_t ValueT;
  88. typedef AtomicHashMap<KeyT, ValueT> AHMapT;
  89. typedef AHMapT::value_type RecordT;
  90. typedef AtomicHashArray<KeyT, ValueT> AHArrayT;
  91. AHArrayT::Config config;
  92. typedef folly::QuadraticProbingAtomicHashMap<KeyT, ValueT> QPAHMapT;
  93. QPAHMapT::Config qpConfig;
  94. static AHArrayT::SmartPtr globalAHA(nullptr);
  95. static std::unique_ptr<AHMapT> globalAHM;
  96. static std::unique_ptr<QPAHMapT> globalQPAHM;
  97. // Generate a deterministic value based on an input key
  98. static int genVal(int key) {
  99. return key / 3;
  100. }
  101. static bool legalKey(const char* a);
  102. struct EqTraits {
  103. bool operator()(const char* a, const char* b) {
  104. return legalKey(a) && (strcmp(a, b) == 0);
  105. }
  106. bool operator()(const char* a, const char& b) {
  107. return legalKey(a) && (a[0] != '\0') && (a[0] == b);
  108. }
  109. bool operator()(const char* a, const StringPiece b) {
  110. return legalKey(a) && (strlen(a) == b.size()) &&
  111. (strcmp(a, b.begin()) == 0);
  112. }
  113. };
  114. struct HashTraits {
  115. size_t operator()(const char* a) {
  116. size_t result = 0;
  117. while (a[0] != 0) {
  118. result += static_cast<size_t>(*(a++));
  119. }
  120. return result;
  121. }
  122. size_t operator()(const char& a) {
  123. return static_cast<size_t>(a);
  124. }
  125. size_t operator()(const StringPiece a) {
  126. size_t result = 0;
  127. for (const auto& ch : a) {
  128. result += static_cast<size_t>(ch);
  129. }
  130. return result;
  131. }
  132. };
  133. typedef AtomicHashMap<const char*, int64_t, HashTraits, EqTraits> AHMCstrInt;
  134. AHMCstrInt::Config cstrIntCfg;
  135. static bool legalKey(const char* a) {
  136. return a != cstrIntCfg.emptyKey && a != cstrIntCfg.lockedKey &&
  137. a != cstrIntCfg.erasedKey;
  138. }
  139. TEST(Ahm, BasicLookup) {
  140. AHMCstrInt myMap(1024, cstrIntCfg);
  141. EXPECT_TRUE(myMap.begin() == myMap.end());
  142. myMap.insert(std::make_pair("f", 42));
  143. EXPECT_EQ(42, myMap.find("f")->second);
  144. {
  145. // Look up a single char, successfully.
  146. auto it = myMap.find<char>('f');
  147. EXPECT_EQ(42, it->second);
  148. }
  149. {
  150. // Look up a single char, unsuccessfully.
  151. auto it = myMap.find<char>('g');
  152. EXPECT_TRUE(it == myMap.end());
  153. }
  154. {
  155. // Look up a string piece, successfully.
  156. const StringPiece piece("f");
  157. auto it = myMap.find(piece);
  158. EXPECT_EQ(42, it->second);
  159. }
  160. }
  161. TEST(Ahm, grow) {
  162. VLOG(1) << "Overhead: " << sizeof(AHArrayT) << " (array) "
  163. << sizeof(AHMapT) + sizeof(AHArrayT) << " (map/set) Bytes.";
  164. uint64_t numEntries = 10000;
  165. float sizeFactor = 0.46f;
  166. std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
  167. // load map - make sure we succeed and the index is accurate
  168. bool success = true;
  169. for (uint64_t i = 0; i < numEntries; i++) {
  170. auto ret = m->insert(RecordT(i, genVal(i)));
  171. success &= ret.second;
  172. success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
  173. }
  174. // Overwrite vals to make sure there are no dups
  175. // Every insert should fail because the keys are already in the map.
  176. success = true;
  177. for (uint64_t i = 0; i < numEntries; i++) {
  178. auto ret = m->insert(RecordT(i, genVal(i * 2)));
  179. success &= (ret.second == false); // fail on collision
  180. success &= (ret.first->second == genVal(i)); // return the previous value
  181. success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
  182. }
  183. EXPECT_TRUE(success);
  184. // check correctness
  185. EXPECT_GT(m->numSubMaps(), 1); // make sure we grew
  186. success = true;
  187. EXPECT_EQ(m->size(), numEntries);
  188. for (size_t i = 0; i < numEntries; i++) {
  189. success &= (m->find(i)->second == genVal(i));
  190. }
  191. EXPECT_TRUE(success);
  192. // Check findAt
  193. success = true;
  194. AHMapT::const_iterator retIt;
  195. for (int32_t i = 0; i < int32_t(numEntries); i++) {
  196. retIt = m->find(i);
  197. retIt = m->findAt(retIt.getIndex());
  198. success &= (retIt->second == genVal(i));
  199. // We use a uint32_t index so that this comparison is between two
  200. // variables of the same type.
  201. success &= (retIt->first == i);
  202. }
  203. EXPECT_TRUE(success);
  204. // Try modifying value
  205. m->find(8)->second = 5309;
  206. EXPECT_EQ(m->find(8)->second, 5309);
  207. // check clear()
  208. m->clear();
  209. success = true;
  210. for (uint64_t i = 0; i < numEntries / 2; i++) {
  211. success &= m->insert(RecordT(i, genVal(i))).second;
  212. }
  213. EXPECT_TRUE(success);
  214. EXPECT_EQ(m->size(), numEntries / 2);
  215. }
  216. TEST(Ahm, iterator) {
  217. int numEntries = 10000;
  218. float sizeFactor = .46f;
  219. std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
  220. // load map - make sure we succeed and the index is accurate
  221. for (int i = 0; i < numEntries; i++) {
  222. m->insert(RecordT(i, genVal(i)));
  223. }
  224. bool success = true;
  225. int count = 0;
  226. FOR_EACH (it, *m) {
  227. success &= (it->second == genVal(it->first));
  228. ++count;
  229. }
  230. EXPECT_TRUE(success);
  231. EXPECT_EQ(count, numEntries);
  232. }
  233. class Counters {
  234. private:
  235. // Note: Unfortunately can't currently put a std::atomic<int64_t> in
  236. // the value in ahm since it doesn't support types that are both non-copy
  237. // and non-move constructible yet.
  238. AtomicHashMap<int64_t, int64_t> ahm;
  239. public:
  240. explicit Counters(size_t numCounters) : ahm(numCounters) {}
  241. void increment(int64_t obj_id) {
  242. auto ret = ahm.insert(std::make_pair(obj_id, 1));
  243. if (!ret.second) {
  244. // obj_id already exists, increment count
  245. __sync_fetch_and_add(&ret.first->second, 1);
  246. }
  247. }
  248. int64_t getValue(int64_t obj_id) {
  249. auto ret = ahm.find(obj_id);
  250. return ret != ahm.end() ? ret->second : 0;
  251. }
  252. // export the counters without blocking increments
  253. string toString() {
  254. string ret = "{\n";
  255. ret.reserve(ahm.size() * 32);
  256. for (const auto& e : ahm) {
  257. ret += folly::to<string>(" [", e.first, ":", e.second, "]\n");
  258. }
  259. ret += "}\n";
  260. return ret;
  261. }
  262. };
  263. // If you get an error "terminate called without an active exception", there
  264. // might be too many threads getting created - decrease numKeys and/or mult.
  265. TEST(Ahm, counter) {
  266. const int numKeys = 10;
  267. const int mult = 10;
  268. Counters c(numKeys);
  269. vector<int64_t> keys;
  270. FOR_EACH_RANGE (i, 1, numKeys) { keys.push_back(i); }
  271. vector<std::thread> threads;
  272. for (auto key : keys) {
  273. FOR_EACH_RANGE (i, 0, key * mult) {
  274. threads.push_back(std::thread([&, key] { c.increment(key); }));
  275. }
  276. }
  277. for (auto& t : threads) {
  278. t.join();
  279. }
  280. string str = c.toString();
  281. for (auto key : keys) {
  282. int val = key * mult;
  283. EXPECT_EQ(val, c.getValue(key));
  284. EXPECT_NE(
  285. string::npos, str.find(folly::to<string>("[", key, ":", val, "]")));
  286. }
  287. }
  288. class Integer {
  289. public:
  290. explicit Integer(KeyT v = 0) : v_(v) {}
  291. Integer& operator=(const Integer& a) {
  292. static bool throwException_ = false;
  293. throwException_ = !throwException_;
  294. if (throwException_) {
  295. throw 1;
  296. }
  297. v_ = a.v_;
  298. return *this;
  299. }
  300. bool operator==(const Integer& a) const {
  301. return v_ == a.v_;
  302. }
  303. private:
  304. KeyT v_;
  305. };
  306. TEST(Ahm, map_exception_safety) {
  307. typedef AtomicHashMap<KeyT, Integer> MyMapT;
  308. int numEntries = 10000;
  309. float sizeFactor = 0.46f;
  310. std::unique_ptr<MyMapT> m(new MyMapT(int(numEntries * sizeFactor)));
  311. bool success = true;
  312. int count = 0;
  313. for (int i = 0; i < numEntries; i++) {
  314. try {
  315. m->insert(i, Integer(genVal(i)));
  316. success &= (m->find(i)->second == Integer(genVal(i)));
  317. ++count;
  318. } catch (...) {
  319. success &= !m->count(i);
  320. }
  321. }
  322. EXPECT_EQ(count, m->size());
  323. EXPECT_TRUE(success);
  324. }
  325. TEST(Ahm, basicErase) {
  326. size_t numEntries = 3000;
  327. std::unique_ptr<AHMapT> s(new AHMapT(numEntries, config));
  328. // Iterate filling up the map and deleting all keys a few times
  329. // to test more than one subMap.
  330. for (int iterations = 0; iterations < 4; ++iterations) {
  331. // Testing insertion of keys
  332. bool success = true;
  333. for (size_t i = 0; i < numEntries; ++i) {
  334. success &= !(s->count(i));
  335. auto ret = s->insert(RecordT(i, i));
  336. success &= s->count(i);
  337. success &= ret.second;
  338. }
  339. EXPECT_TRUE(success);
  340. EXPECT_EQ(s->size(), numEntries);
  341. // Delete every key in the map and verify that the key is gone and the the
  342. // size is correct.
  343. success = true;
  344. for (size_t i = 0; i < numEntries; ++i) {
  345. success &= s->erase(i);
  346. success &= (s->size() == numEntries - 1 - i);
  347. success &= !(s->count(i));
  348. success &= !(s->erase(i));
  349. }
  350. EXPECT_TRUE(success);
  351. }
  352. VLOG(1) << "Final number of subMaps = " << s->numSubMaps();
  353. }
  354. namespace {
  355. inline KeyT randomizeKey(int key) {
  356. // We deterministically randomize the key to more accurately simulate
  357. // real-world usage, and to avoid pathalogical performance patterns (e.g.
  358. // those related to std::hash<int64_t>()(1) == 1).
  359. //
  360. // Use a hash function we don't normally use for ints to avoid interactions.
  361. return folly::hash::jenkins_rev_mix32(key);
  362. }
  363. int numOpsPerThread = 0;
  364. void* insertThread(void* jj) {
  365. int64_t j = (int64_t)jj;
  366. for (int i = 0; i < numOpsPerThread; ++i) {
  367. KeyT key = randomizeKey(i + j * numOpsPerThread);
  368. globalAHM->insert(key, genVal(key));
  369. }
  370. return nullptr;
  371. }
  372. void* qpInsertThread(void* jj) {
  373. int64_t j = (int64_t)jj;
  374. for (int i = 0; i < numOpsPerThread; ++i) {
  375. KeyT key = randomizeKey(i + j * numOpsPerThread);
  376. globalQPAHM->insert(key, genVal(key));
  377. }
  378. return nullptr;
  379. }
  380. void* insertThreadArr(void* jj) {
  381. int64_t j = (int64_t)jj;
  382. for (int i = 0; i < numOpsPerThread; ++i) {
  383. KeyT key = randomizeKey(i + j * numOpsPerThread);
  384. globalAHA->insert(std::make_pair(key, genVal(key)));
  385. }
  386. return nullptr;
  387. }
  388. std::atomic<bool> runThreadsCreatedAllThreads;
  389. void runThreads(void* (*mainFunc)(void*), int numThreads, void** statuses) {
  390. folly::BenchmarkSuspender susp;
  391. runThreadsCreatedAllThreads.store(false);
  392. vector<std::thread> threads;
  393. for (int64_t j = 0; j < numThreads; j++) {
  394. threads.emplace_back([statuses, mainFunc, j]() {
  395. auto ret = mainFunc((void*)j);
  396. if (statuses != nullptr) {
  397. statuses[j] = ret;
  398. }
  399. });
  400. }
  401. susp.dismiss();
  402. runThreadsCreatedAllThreads.store(true);
  403. for (size_t i = 0; i < threads.size(); ++i) {
  404. threads[i].join();
  405. }
  406. }
  407. void runThreads(void* (*mainFunc)(void*)) {
  408. runThreads(mainFunc, FLAGS_numThreads, nullptr);
  409. }
  410. } // namespace
  411. TEST(Ahm, collision_test) {
  412. const int numInserts = 1000000 / 4;
  413. // Doing the same number on each thread so we collide.
  414. numOpsPerThread = numInserts;
  415. float sizeFactor = 0.46f;
  416. int entrySize = sizeof(KeyT) + sizeof(ValueT);
  417. VLOG(1) << "Testing " << numInserts << " unique " << entrySize
  418. << " Byte entries replicated in " << FLAGS_numThreads
  419. << " threads with " << FLAGS_maxLoadFactor * 100.0
  420. << "% max load factor.";
  421. globalAHM = std::make_unique<AHMapT>(int(numInserts * sizeFactor), config);
  422. size_t sizeInit = globalAHM->capacity();
  423. VLOG(1) << " Initial capacity: " << sizeInit;
  424. double start = nowInUsec();
  425. runThreads([](void*) -> void* { // collisionInsertThread
  426. for (int i = 0; i < numOpsPerThread; ++i) {
  427. KeyT key = randomizeKey(i);
  428. globalAHM->insert(key, genVal(key));
  429. }
  430. return nullptr;
  431. });
  432. double elapsed = nowInUsec() - start;
  433. size_t finalCap = globalAHM->capacity();
  434. size_t sizeAHM = globalAHM->size();
  435. VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
  436. << " duplicate inserts (atomic).";
  437. VLOG(1) << " Final capacity: " << finalCap << " in "
  438. << globalAHM->numSubMaps() << " sub maps ("
  439. << sizeAHM * 100 / finalCap << "% load factor, "
  440. << (finalCap - sizeInit) * 100 / sizeInit << "% growth).";
  441. // check correctness
  442. EXPECT_EQ(sizeAHM, numInserts);
  443. bool success = true;
  444. for (int i = 0; i < numInserts; ++i) {
  445. KeyT key = randomizeKey(i);
  446. success &= (globalAHM->find(key)->second == genVal(key));
  447. }
  448. EXPECT_TRUE(success);
  449. // check colliding finds
  450. start = nowInUsec();
  451. runThreads([](void*) -> void* { // collisionFindThread
  452. KeyT key(0);
  453. for (int i = 0; i < numOpsPerThread; ++i) {
  454. globalAHM->find(key);
  455. }
  456. return nullptr;
  457. });
  458. elapsed = nowInUsec() - start;
  459. VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
  460. << " duplicate finds (atomic).";
  461. }
  462. namespace {
  463. const int kInsertPerThread = 100000;
  464. int raceFinalSizeEstimate;
  465. void* raceIterateThread(void*) {
  466. int count = 0;
  467. AHMapT::iterator it = globalAHM->begin();
  468. AHMapT::iterator end = globalAHM->end();
  469. for (; it != end; ++it) {
  470. ++count;
  471. if (count > raceFinalSizeEstimate) {
  472. EXPECT_FALSE("Infinite loop in iterator.");
  473. return nullptr;
  474. }
  475. }
  476. return nullptr;
  477. }
  478. void* raceInsertRandomThread(void*) {
  479. for (int i = 0; i < kInsertPerThread; ++i) {
  480. KeyT key = rand();
  481. globalAHM->insert(key, genVal(key));
  482. }
  483. return nullptr;
  484. }
  485. } // namespace
  486. // Test for race conditions when inserting and iterating at the same time and
  487. // creating multiple submaps.
  488. TEST(Ahm, race_insert_iterate_thread_test) {
  489. const int kInsertThreads = 20;
  490. const int kIterateThreads = 20;
  491. raceFinalSizeEstimate = kInsertThreads * kInsertPerThread;
  492. VLOG(1) << "Testing iteration and insertion with " << kInsertThreads
  493. << " threads inserting and " << kIterateThreads
  494. << " threads iterating.";
  495. globalAHM = std::make_unique<AHMapT>(raceFinalSizeEstimate / 9, config);
  496. vector<pthread_t> threadIds;
  497. for (int j = 0; j < kInsertThreads + kIterateThreads; j++) {
  498. pthread_t tid;
  499. void* (*thread)(void*) =
  500. (j < kInsertThreads ? raceInsertRandomThread : raceIterateThread);
  501. if (pthread_create(&tid, nullptr, thread, nullptr) != 0) {
  502. LOG(ERROR) << "Could not start thread";
  503. } else {
  504. threadIds.push_back(tid);
  505. }
  506. }
  507. for (size_t i = 0; i < threadIds.size(); ++i) {
  508. pthread_join(threadIds[i], nullptr);
  509. }
  510. VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
  511. VLOG(1) << "Final size of map " << globalAHM->size();
  512. }
  513. namespace {
  514. const int kTestEraseInsertions = 200000;
  515. std::atomic<int32_t> insertedLevel;
  516. void* testEraseInsertThread(void*) {
  517. for (int i = 0; i < kTestEraseInsertions; ++i) {
  518. KeyT key = randomizeKey(i);
  519. globalAHM->insert(key, genVal(key));
  520. insertedLevel.store(i, std::memory_order_release);
  521. }
  522. insertedLevel.store(kTestEraseInsertions, std::memory_order_release);
  523. return nullptr;
  524. }
  525. void* testEraseEraseThread(void*) {
  526. for (int i = 0; i < kTestEraseInsertions; ++i) {
  527. /*
  528. * Make sure that we don't get ahead of the insert thread, because
  529. * part of the condition for this unit test succeeding is that the
  530. * map ends up empty.
  531. *
  532. * Note, there is a subtle case here when a new submap is
  533. * allocated: the erasing thread might get 0 from count(key)
  534. * because it hasn't seen numSubMaps_ update yet. To avoid this
  535. * race causing problems for the test (it's ok for real usage), we
  536. * lag behind the inserter by more than just element.
  537. */
  538. const int lag = 10;
  539. int currentLevel;
  540. do {
  541. currentLevel = insertedLevel.load(std::memory_order_acquire);
  542. if (currentLevel == kTestEraseInsertions) {
  543. currentLevel += lag + 1;
  544. }
  545. } while (currentLevel - lag < i);
  546. KeyT key = randomizeKey(i);
  547. while (globalAHM->count(key)) {
  548. if (globalAHM->erase(key)) {
  549. break;
  550. }
  551. }
  552. }
  553. return nullptr;
  554. }
  555. } // namespace
  556. // Here we have a single thread inserting some values, and several threads
  557. // racing to delete the values in the order they were inserted.
  558. TEST(Ahm, thread_erase_insert_race) {
  559. const int kInsertThreads = 1;
  560. const int kEraseThreads = 10;
  561. VLOG(1) << "Testing insertion and erase with " << kInsertThreads
  562. << " thread inserting and " << kEraseThreads << " threads erasing.";
  563. globalAHM = std::make_unique<AHMapT>(kTestEraseInsertions / 4, config);
  564. vector<pthread_t> threadIds;
  565. for (int64_t j = 0; j < kInsertThreads + kEraseThreads; j++) {
  566. pthread_t tid;
  567. void* (*thread)(void*) =
  568. (j < kInsertThreads ? testEraseInsertThread : testEraseEraseThread);
  569. if (pthread_create(&tid, nullptr, thread, (void*)j) != 0) {
  570. LOG(ERROR) << "Could not start thread";
  571. } else {
  572. threadIds.push_back(tid);
  573. }
  574. }
  575. for (size_t i = 0; i < threadIds.size(); i++) {
  576. pthread_join(threadIds[i], nullptr);
  577. }
  578. EXPECT_TRUE(globalAHM->empty());
  579. EXPECT_EQ(globalAHM->size(), 0);
  580. VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
  581. }
  582. // Repro for T#483734: Duplicate AHM inserts due to incorrect AHA return value.
  583. typedef AtomicHashArray<int32_t, int32_t> AHA;
  584. AHA::Config configRace;
  585. auto atomicHashArrayInsertRaceArray = AHA::create(2, configRace);
  586. void* atomicHashArrayInsertRaceThread(void* /* j */) {
  587. AHA* arr = atomicHashArrayInsertRaceArray.get();
  588. uintptr_t numInserted = 0;
  589. while (!runThreadsCreatedAllThreads.load()) {
  590. ;
  591. }
  592. for (int i = 0; i < 2; i++) {
  593. if (arr->insert(RecordT(randomizeKey(i), 0)).first != arr->end()) {
  594. numInserted++;
  595. }
  596. }
  597. return (void*)numInserted;
  598. }
  599. TEST(Ahm, atomic_hash_array_insert_race) {
  600. AHA* arr = atomicHashArrayInsertRaceArray.get();
  601. int numIterations = 5000;
  602. constexpr int numThreads = 4;
  603. void* statuses[numThreads];
  604. for (int i = 0; i < numIterations; i++) {
  605. arr->clear();
  606. runThreads(atomicHashArrayInsertRaceThread, numThreads, statuses);
  607. EXPECT_GE(arr->size(), 1);
  608. for (int j = 0; j < numThreads; j++) {
  609. EXPECT_EQ(arr->size(), uintptr_t(statuses[j]));
  610. }
  611. }
  612. }
  613. // Repro for T#5841499. Race between erase() and find() on the same key.
  614. TEST(Ahm, erase_find_race) {
  615. const uint64_t limit = 10000;
  616. AtomicHashMap<uint64_t, uint64_t> map(limit + 10);
  617. std::atomic<uint64_t> key{1};
  618. // Invariant: all values are equal to their keys.
  619. // At any moment there is one or two consecutive keys in the map.
  620. std::thread write_thread([&]() {
  621. while (true) {
  622. uint64_t k = ++key;
  623. if (k > limit) {
  624. break;
  625. }
  626. map.insert(k + 1, k + 1);
  627. map.erase(k);
  628. }
  629. });
  630. std::thread read_thread([&]() {
  631. while (true) {
  632. uint64_t k = key.load();
  633. if (k > limit) {
  634. break;
  635. }
  636. auto it = map.find(k);
  637. if (it != map.end()) {
  638. ASSERT_EQ(k, it->second);
  639. }
  640. }
  641. });
  642. read_thread.join();
  643. write_thread.join();
  644. }
  645. // Erase right after insert race bug repro (t9130653)
  646. TEST(Ahm, erase_after_insert_race) {
  647. const uint64_t limit = 10000;
  648. const size_t num_threads = 100;
  649. const size_t num_iters = 500;
  650. AtomicHashMap<uint64_t, uint64_t> map(limit + 10);
  651. std::atomic<bool> go{false};
  652. std::vector<std::thread> ts;
  653. for (size_t i = 0; i < num_threads; ++i) {
  654. ts.emplace_back([&]() {
  655. while (!go) {
  656. continue;
  657. }
  658. for (size_t n = 0; n < num_iters; ++n) {
  659. map.erase(1);
  660. map.insert(1, 1);
  661. }
  662. });
  663. }
  664. go = true;
  665. for (auto& t : ts) {
  666. t.join();
  667. }
  668. }
  669. // Repro for a bug when iterator didn't skip empty submaps.
  670. TEST(Ahm, iterator_skips_empty_submaps) {
  671. AtomicHashMap<uint64_t, uint64_t>::Config conf;
  672. conf.growthFactor = 1;
  673. AtomicHashMap<uint64_t, uint64_t> map(1, conf);
  674. map.insert(1, 1);
  675. map.insert(2, 2);
  676. map.insert(3, 3);
  677. map.erase(2);
  678. auto it = map.find(1);
  679. ASSERT_NE(map.end(), it);
  680. ASSERT_EQ(1, it->first);
  681. ASSERT_EQ(1, it->second);
  682. ++it;
  683. ASSERT_NE(map.end(), it);
  684. ASSERT_EQ(3, it->first);
  685. ASSERT_EQ(3, it->second);
  686. ++it;
  687. ASSERT_EQ(map.end(), it);
  688. }
  689. namespace {
  690. void loadGlobalAha() {
  691. std::cout << "loading global AHA with " << FLAGS_numThreads
  692. << " threads...\n";
  693. uint64_t start = nowInUsec();
  694. globalAHA = AHArrayT::create(maxBMElements, config);
  695. numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
  696. CHECK_EQ(0, FLAGS_numBMElements % FLAGS_numThreads)
  697. << "kNumThreads must evenly divide kNumInserts.";
  698. runThreads(insertThreadArr);
  699. uint64_t elapsed = nowInUsec() - start;
  700. std::cout << " took " << elapsed / 1000 << " ms ("
  701. << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
  702. EXPECT_EQ(globalAHA->size(), FLAGS_numBMElements);
  703. }
  704. void loadGlobalAhm() {
  705. std::cout << "loading global AHM with " << FLAGS_numThreads
  706. << " threads...\n";
  707. uint64_t start = nowInUsec();
  708. globalAHM = std::make_unique<AHMapT>(maxBMElements, config);
  709. numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
  710. runThreads(insertThread);
  711. uint64_t elapsed = nowInUsec() - start;
  712. std::cout << " took " << elapsed / 1000 << " ms ("
  713. << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
  714. EXPECT_EQ(globalAHM->size(), FLAGS_numBMElements);
  715. }
  716. void loadGlobalQPAhm() {
  717. std::cout << "loading global QPAHM with " << FLAGS_numThreads
  718. << " threads...\n";
  719. uint64_t start = nowInUsec();
  720. globalQPAHM = std::make_unique<QPAHMapT>(maxBMElements, qpConfig);
  721. numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
  722. runThreads(qpInsertThread);
  723. uint64_t elapsed = nowInUsec() - start;
  724. std::cout << " took " << elapsed / 1000 << " ms ("
  725. << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
  726. EXPECT_EQ(globalQPAHM->size(), FLAGS_numBMElements);
  727. }
  728. } // namespace
  729. BENCHMARK(st_aha_find, iters) {
  730. CHECK_LE(iters, FLAGS_numBMElements);
  731. for (size_t i = 0; i < iters; i++) {
  732. KeyT key = randomizeKey(i);
  733. folly::doNotOptimizeAway(globalAHA->find(key)->second);
  734. }
  735. }
  736. BENCHMARK(st_ahm_find, iters) {
  737. CHECK_LE(iters, FLAGS_numBMElements);
  738. for (size_t i = 0; i < iters; i++) {
  739. KeyT key = randomizeKey(i);
  740. folly::doNotOptimizeAway(globalAHM->find(key)->second);
  741. }
  742. }
  743. BENCHMARK(st_qpahm_find, iters) {
  744. CHECK_LE(iters, FLAGS_numBMElements);
  745. for (size_t i = 0; i < iters; i++) {
  746. KeyT key = randomizeKey(i);
  747. folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
  748. }
  749. }
  750. BENCHMARK_DRAW_LINE();
  751. BENCHMARK(mt_ahm_miss, iters) {
  752. CHECK_LE(iters, FLAGS_numBMElements);
  753. numOpsPerThread = iters / FLAGS_numThreads;
  754. runThreads([](void* jj) -> void* {
  755. int64_t j = (int64_t)jj;
  756. while (!runThreadsCreatedAllThreads.load()) {
  757. ;
  758. }
  759. for (int i = 0; i < numOpsPerThread; ++i) {
  760. KeyT key = i + j * numOpsPerThread * 100;
  761. folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
  762. }
  763. return nullptr;
  764. });
  765. }
  766. BENCHMARK(mt_qpahm_miss, iters) {
  767. CHECK_LE(iters, FLAGS_numBMElements);
  768. numOpsPerThread = iters / FLAGS_numThreads;
  769. runThreads([](void* jj) -> void* {
  770. int64_t j = (int64_t)jj;
  771. while (!runThreadsCreatedAllThreads.load()) {
  772. ;
  773. }
  774. for (int i = 0; i < numOpsPerThread; ++i) {
  775. KeyT key = i + j * numOpsPerThread * 100;
  776. folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
  777. }
  778. return nullptr;
  779. });
  780. }
  781. BENCHMARK(st_ahm_miss, iters) {
  782. CHECK_LE(iters, FLAGS_numBMElements);
  783. for (size_t i = 0; i < iters; i++) {
  784. KeyT key = randomizeKey(i + iters * 100);
  785. folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
  786. }
  787. }
  788. BENCHMARK(st_qpahm_miss, iters) {
  789. CHECK_LE(iters, FLAGS_numBMElements);
  790. for (size_t i = 0; i < iters; i++) {
  791. KeyT key = randomizeKey(i + iters * 100);
  792. folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
  793. }
  794. }
  795. BENCHMARK(mt_ahm_find_insert_mix, iters) {
  796. CHECK_LE(iters, FLAGS_numBMElements);
  797. numOpsPerThread = iters / FLAGS_numThreads;
  798. runThreads([](void* jj) -> void* {
  799. int64_t j = (int64_t)jj;
  800. while (!runThreadsCreatedAllThreads.load()) {
  801. ;
  802. }
  803. for (int i = 0; i < numOpsPerThread; ++i) {
  804. if (i % 128) { // ~1% insert mix
  805. KeyT key = randomizeKey(i + j * numOpsPerThread);
  806. folly::doNotOptimizeAway(globalAHM->find(key)->second);
  807. } else {
  808. KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
  809. globalAHM->insert(key, genVal(key));
  810. }
  811. }
  812. return nullptr;
  813. });
  814. }
  815. BENCHMARK(mt_qpahm_find_insert_mix, iters) {
  816. CHECK_LE(iters, FLAGS_numBMElements);
  817. numOpsPerThread = iters / FLAGS_numThreads;
  818. runThreads([](void* jj) -> void* {
  819. int64_t j = (int64_t)jj;
  820. while (!runThreadsCreatedAllThreads.load()) {
  821. ;
  822. }
  823. for (int i = 0; i < numOpsPerThread; ++i) {
  824. if (i % 128) { // ~1% insert mix
  825. KeyT key = randomizeKey(i + j * numOpsPerThread);
  826. folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
  827. } else {
  828. KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
  829. globalQPAHM->insert(key, genVal(key));
  830. }
  831. }
  832. return nullptr;
  833. });
  834. }
  835. BENCHMARK(mt_aha_find, iters) {
  836. CHECK_LE(iters, FLAGS_numBMElements);
  837. numOpsPerThread = iters / FLAGS_numThreads;
  838. runThreads([](void* jj) -> void* {
  839. int64_t j = (int64_t)jj;
  840. while (!runThreadsCreatedAllThreads.load()) {
  841. ;
  842. }
  843. for (int i = 0; i < numOpsPerThread; ++i) {
  844. KeyT key = randomizeKey(i + j * numOpsPerThread);
  845. folly::doNotOptimizeAway(globalAHA->find(key)->second);
  846. }
  847. return nullptr;
  848. });
  849. }
  850. BENCHMARK(mt_ahm_find, iters) {
  851. CHECK_LE(iters, FLAGS_numBMElements);
  852. numOpsPerThread = iters / FLAGS_numThreads;
  853. runThreads([](void* jj) -> void* {
  854. int64_t j = (int64_t)jj;
  855. while (!runThreadsCreatedAllThreads.load()) {
  856. ;
  857. }
  858. for (int i = 0; i < numOpsPerThread; ++i) {
  859. KeyT key = randomizeKey(i + j * numOpsPerThread);
  860. folly::doNotOptimizeAway(globalAHM->find(key)->second);
  861. }
  862. return nullptr;
  863. });
  864. }
  865. BENCHMARK(mt_qpahm_find, iters) {
  866. CHECK_LE(iters, FLAGS_numBMElements);
  867. numOpsPerThread = iters / FLAGS_numThreads;
  868. runThreads([](void* jj) -> void* {
  869. int64_t j = (int64_t)jj;
  870. while (!runThreadsCreatedAllThreads.load()) {
  871. ;
  872. }
  873. for (int i = 0; i < numOpsPerThread; ++i) {
  874. KeyT key = randomizeKey(i + j * numOpsPerThread);
  875. folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
  876. }
  877. return nullptr;
  878. });
  879. }
  880. KeyT k;
  881. BENCHMARK(st_baseline_modulus_and_random, iters) {
  882. for (size_t i = 0; i < iters; ++i) {
  883. k = randomizeKey(i) % iters;
  884. }
  885. }
  886. // insertions go last because they reset the map
  887. BENCHMARK(mt_ahm_insert, iters) {
  888. BENCHMARK_SUSPEND {
  889. globalAHM = std::make_unique<AHMapT>(int(iters * LF), config);
  890. numOpsPerThread = iters / FLAGS_numThreads;
  891. }
  892. runThreads(insertThread);
  893. }
  894. BENCHMARK(mt_qpahm_insert, iters) {
  895. BENCHMARK_SUSPEND {
  896. globalQPAHM = std::make_unique<QPAHMapT>(int(iters * LF), qpConfig);
  897. numOpsPerThread = iters / FLAGS_numThreads;
  898. }
  899. runThreads(qpInsertThread);
  900. }
  901. BENCHMARK(st_ahm_insert, iters) {
  902. folly::BenchmarkSuspender susp;
  903. std::unique_ptr<AHMapT> ahm(new AHMapT(int(iters * LF), config));
  904. susp.dismiss();
  905. for (size_t i = 0; i < iters; i++) {
  906. KeyT key = randomizeKey(i);
  907. ahm->insert(key, genVal(key));
  908. }
  909. }
  910. BENCHMARK(st_qpahm_insert, iters) {
  911. folly::BenchmarkSuspender susp;
  912. std::unique_ptr<QPAHMapT> ahm(new QPAHMapT(int(iters * LF), qpConfig));
  913. susp.dismiss();
  914. for (size_t i = 0; i < iters; i++) {
  915. KeyT key = randomizeKey(i);
  916. ahm->insert(key, genVal(key));
  917. }
  918. }
  919. void benchmarkSetup() {
  920. config.maxLoadFactor = FLAGS_maxLoadFactor;
  921. qpConfig.maxLoadFactor = FLAGS_maxLoadFactor;
  922. configRace.maxLoadFactor = 0.5;
  923. int numCores = sysconf(_SC_NPROCESSORS_ONLN);
  924. loadGlobalAha();
  925. loadGlobalAhm();
  926. loadGlobalQPAhm();
  927. string numIters =
  928. folly::to<string>(std::min(1000000, int(FLAGS_numBMElements)));
  929. gflags::SetCommandLineOptionWithMode(
  930. "bm_max_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
  931. gflags::SetCommandLineOptionWithMode(
  932. "bm_min_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
  933. string numCoresStr = folly::to<string>(numCores);
  934. gflags::SetCommandLineOptionWithMode(
  935. "numThreads", numCoresStr.c_str(), gflags::SET_FLAG_IF_DEFAULT);
  936. std::cout << "\nRunning AHM benchmarks on machine with " << numCores
  937. << " logical cores.\n"
  938. " num elements per map: "
  939. << FLAGS_numBMElements << "\n"
  940. << " num threads for mt tests: " << FLAGS_numThreads << "\n"
  941. << " AHM load factor: " << FLAGS_targetLoadFactor << "\n\n";
  942. }
  943. int main(int argc, char** argv) {
  944. testing::InitGoogleTest(&argc, argv);
  945. gflags::ParseCommandLineFlags(&argc, &argv, true);
  946. auto ret = RUN_ALL_TESTS();
  947. if (!ret && FLAGS_benchmark) {
  948. benchmarkSetup();
  949. folly::runBenchmarks();
  950. }
  951. return ret;
  952. }
  953. /*
  954. loading global AHA with 8 threads...
  955. took 487 ms (40 ns/insert).
  956. loading global AHM with 8 threads...
  957. took 478 ms (39 ns/insert).
  958. loading global QPAHM with 8 threads...
  959. took 478 ms (39 ns/insert).
  960. Running AHM benchmarks on machine with 24 logical cores.
  961. num elements per map: 12000000
  962. num threads for mt tests: 24
  963. AHM load factor: 0.75
  964. ============================================================================
  965. folly/test/AtomicHashMapTest.cpp relative time/iter iters/s
  966. ============================================================================
  967. st_aha_find 92.63ns 10.80M
  968. st_ahm_find 107.78ns 9.28M
  969. st_qpahm_find 90.69ns 11.03M
  970. ----------------------------------------------------------------------------
  971. mt_ahm_miss 2.09ns 477.36M
  972. mt_qpahm_miss 1.37ns 728.82M
  973. st_ahm_miss 241.07ns 4.15M
  974. st_qpahm_miss 223.17ns 4.48M
  975. mt_ahm_find_insert_mix 8.05ns 124.24M
  976. mt_qpahm_find_insert_mix 9.10ns 109.85M
  977. mt_aha_find 6.82ns 146.68M
  978. mt_ahm_find 7.95ns 125.77M
  979. mt_qpahm_find 6.81ns 146.83M
  980. st_baseline_modulus_and_random 6.02ns 166.03M
  981. mt_ahm_insert 14.29ns 69.97M
  982. mt_qpahm_insert 11.68ns 85.61M
  983. st_ahm_insert 125.39ns 7.98M
  984. st_qpahm_insert 128.76ns 7.77M
  985. ============================================================================
  986. */