RelaxedConcurrentPriorityQueue.h 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220
  1. /*
  2. * Copyright 2018-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <algorithm>
  18. #include <atomic>
  19. #include <climits>
  20. #include <cmath>
  21. #include <iomanip>
  22. #include <iostream>
  23. #include <mutex>
  24. #include <folly/Random.h>
  25. #include <folly/SpinLock.h>
  26. #include <folly/ThreadLocal.h>
  27. #include <folly/detail/Futex.h>
  28. #include <folly/lang/Align.h>
  29. #include <folly/synchronization/Hazptr.h>
  30. #include <folly/synchronization/WaitOptions.h>
  31. #include <folly/synchronization/detail/Spin.h>
  32. /// ------ Concurrent Priority Queue Implementation ------
  33. // The concurrent priority queue implementation is based on the
  34. // Mound data structure (Mounds: Array-Based Concurrent Priority Queues
  35. // by Yujie Liu and Michael Spear, ICPP 2012)
  36. //
  37. /// --- Overview ---
  38. // This relaxed implementation extends the Mound algorithm, and provides
  39. // following features:
  40. // - Arbitrary priorities.
  41. // - Unbounded size.
  42. // - Push, pop, empty, size functions. [TODO: Non-waiting and timed wait pop]
  43. // - Supports blocking.
  44. // - Fast and Scalable.
  45. //
  46. /// --- Mound ---
  47. // A Mound is a heap where each element is a sorted linked list.
  48. // First nodes in the lists maintain the heap property. Push randomly
  49. // selects a leaf at the bottom level, then uses binary search to find
  50. // a place to insert the new node to the head of the list. Pop gets
  51. // the node from the head of the list at the root, then swap the
  52. // list down until the heap feature holds. To use Mound in our
  53. // implementation, we need to solve the following problems:
  54. // - 1. Lack of general relaxed implementations. Mound is appealing
  55. // for relaxed priority queue implementation because pop the whole
  56. // list from the root is straightforward. One thread pops the list
  57. // and following threads can pop from the list until its empty.
  58. // Those pops only trigger one swap done operation. Thus reduce
  59. // the latency for pop and reduce the contention for Mound.
  60. // The difficulty is to provide a scalable and fast mechanism
  61. // to let threads concurrently get elements from the list.
  62. // - 2. Lack of control of list length. The length for every
  63. // lists is critical for the performance. Mound suffers from not
  64. // only the extreme cases(Push with increasing priorities, Mound
  65. // becomes a sorted linked list; Push with decreasing priorities,
  66. // Mound becomes to a regular heap), but also the common case(for
  67. // random generated priorities, Mound degrades to the regular heap
  68. // after millions of push/pop operations). The difficulty is to
  69. // stabilize the list length without losing the accuracy and performance.
  70. // - 3. Does not support blocking. Blocking is an important feature.
  71. // Mound paper does not mention it. Designing the new algorithm for
  72. // efficient blocking is challenging.
  73. // - 4. Memory management. Mound allows optimistic reads. We need to
  74. // protect the node from been reclaimed.
  75. //
  76. /// --- Design ---
  77. // Our implementation extends Mound algorithm to support
  78. // efficient relaxed pop. We employ a shared buffer algorithm to
  79. // share the popped list. Our algorithm makes popping from shared
  80. // buffer as fast as fetch_and_add. We improve the performance
  81. // and compact the heap structure by stabilizing the size of each list.
  82. // The implementation exposes the template parameter to set the
  83. // preferred list length. Under the hood, we provide algorithms for
  84. // fast inserting, pruning, and merging. The blocking algorithm is
  85. // tricky. It allows one producer only wakes one consumer at a time.
  86. // It also does not block the producer. For optimistic read, we use
  87. // hazard pointer to protect the node from been reclaimed. We optimize the
  88. // check-lock-check pattern by using test-test-and-set spin lock.
  89. /// --- Template Parameters: ---
  90. // 1. PopBatch could be 0 or a positive integer.
  91. // If it is 0, only pop one node at a time.
  92. // This is the strict implementation. It guarantees the return
  93. // priority is alway the highest. If it is > 0, we keep
  94. // up to that number of nodes in a shared buffer to be consumed by
  95. // subsequent pop operations.
  96. //
  97. // 2. ListTargetSize represents the minimal length for the list. It
  98. // solves the problem when inserting to Mound with
  99. // decreasing priority order (degrade to a heap). Moreover,
  100. // it maintains the Mound structure stable after trillions of
  101. // operations, which causes unbalanced problem in the original
  102. // Mound algorithm. We set the prunning length and merging lengtyh
  103. // based on this parameter.
  104. //
  105. /// --- Interface ---
  106. // void push(const T& val)
  107. // void pop(T& val)
  108. // size_t size()
  109. // bool empty()
  110. namespace folly {
  111. template <
  112. typename T,
  113. bool MayBlock = false,
  114. bool SupportsSize = false,
  115. size_t PopBatch = 16,
  116. size_t ListTargetSize = 25,
  117. typename Mutex = folly::SpinLock,
  118. template <typename> class Atom = std::atomic>
  119. class RelaxedConcurrentPriorityQueue {
  120. // Max height of the tree
  121. static constexpr uint32_t MAX_LEVELS = 32;
  122. // The default minimum value
  123. static constexpr T MIN_VALUE = std::numeric_limits<T>::min();
  124. // Align size for the shared buffer node
  125. static constexpr size_t Align = 1u << 7;
  126. static constexpr int LevelForForceInsert = 3;
  127. static constexpr int LevelForTraverseParent = 7;
  128. static_assert(PopBatch <= 256, "PopBatch must be <= 256");
  129. static_assert(
  130. ListTargetSize >= 1 && ListTargetSize <= 256,
  131. "TargetSize must be in the range [1, 256]");
  132. // The maximal length for the list
  133. static constexpr size_t PruningSize = ListTargetSize * 2;
  134. // When pop from Mound, tree elements near the leaf
  135. // level are likely be very small (the length of the list). When
  136. // swapping down after pop a list, we check the size of the
  137. // children to decide whether to merge them to their parent.
  138. static constexpr size_t MergingSize = ListTargetSize;
  139. /// List Node structure
  140. struct Node : public folly::hazptr_obj_base<Node, Atom> {
  141. Node* next;
  142. T val;
  143. };
  144. /// Mound Element (Tree node), head points to a linked list
  145. struct MoundElement {
  146. // Reading (head, size) without acquiring the lock
  147. Atom<Node*> head;
  148. Atom<size_t> size;
  149. alignas(Align) Mutex lock;
  150. MoundElement() { // initializer
  151. head.store(nullptr, std::memory_order_relaxed);
  152. size.store(0, std::memory_order_relaxed);
  153. }
  154. };
  155. /// The pos strcture simplify the implementation
  156. struct Position {
  157. uint32_t level;
  158. uint32_t index;
  159. };
  160. /// Node for shared buffer should be aligned
  161. struct BufferNode {
  162. alignas(Align) Atom<Node*> pnode;
  163. };
  164. /// Data members
  165. // Mound structure -> 2D array to represent a tree
  166. MoundElement* levels_[MAX_LEVELS];
  167. // Record the current leaf level (root is 0)
  168. Atom<uint32_t> bottom_;
  169. // It is used when expanding the tree
  170. Atom<uint32_t> guard_;
  171. // Mound with shared buffer
  172. // Following two members are accessed by consumers
  173. std::unique_ptr<BufferNode[]> shared_buffer_;
  174. alignas(Align) Atom<int> top_loc_;
  175. /// Blocking algorithm
  176. // Numbers of futexs in the array
  177. static constexpr size_t NumFutex = 128;
  178. // The index gap for accessing futex in the array
  179. static constexpr size_t Stride = 33;
  180. std::unique_ptr<folly::detail::Futex<Atom>[]> futex_array_;
  181. alignas(Align) Atom<uint32_t> cticket_;
  182. alignas(Align) Atom<uint32_t> pticket_;
  183. // Two counters to calculate size of the queue
  184. alignas(Align) Atom<size_t> counter_p_;
  185. alignas(Align) Atom<size_t> counter_c_;
  186. public:
  187. /// Constructor
  188. RelaxedConcurrentPriorityQueue()
  189. : cticket_(1), pticket_(1), counter_p_(0), counter_c_(0) {
  190. if (MayBlock) {
  191. futex_array_.reset(new folly::detail::Futex<Atom>[NumFutex]);
  192. }
  193. if (PopBatch > 0) {
  194. top_loc_ = -1;
  195. shared_buffer_.reset(new BufferNode[PopBatch]);
  196. for (size_t i = 0; i < PopBatch; i++) {
  197. shared_buffer_[i].pnode = nullptr;
  198. }
  199. }
  200. bottom_.store(0, std::memory_order_relaxed);
  201. guard_.store(0, std::memory_order_relaxed);
  202. // allocate the root MoundElement and initialize Mound
  203. levels_[0] = new MoundElement[1]; // default MM for MoundElement
  204. for (uint32_t i = 1; i < MAX_LEVELS; i++) {
  205. levels_[i] = nullptr;
  206. }
  207. }
  208. ~RelaxedConcurrentPriorityQueue() {
  209. if (PopBatch > 0) {
  210. deleteSharedBuffer();
  211. }
  212. if (MayBlock) {
  213. futex_array_.reset();
  214. }
  215. Position pos;
  216. pos.level = pos.index = 0;
  217. deleteAllNodes(pos);
  218. // default MM for MoundElement
  219. for (int i = getBottomLevel(); i >= 0; i--) {
  220. delete[] levels_[i];
  221. }
  222. }
  223. void push(const T& val) {
  224. moundPush(val);
  225. if (SupportsSize) {
  226. counter_p_.fetch_add(1, std::memory_order_relaxed);
  227. }
  228. }
  229. void pop(T& val) {
  230. moundPop(val);
  231. if (SupportsSize) {
  232. counter_c_.fetch_add(1, std::memory_order_relaxed);
  233. }
  234. }
  235. /// Note: size() and empty() are guaranteed to be accurate only if
  236. /// the queue is not changed concurrently.
  237. /// Returns an estimate of the size of the queue
  238. size_t size() {
  239. DCHECK(SupportsSize);
  240. size_t p = counter_p_.load(std::memory_order_acquire);
  241. size_t c = counter_c_.load(std::memory_order_acquire);
  242. return (p > c) ? p - c : 0;
  243. }
  244. /// Returns true only if the queue was empty during the call.
  245. bool empty() {
  246. return isEmpty();
  247. }
  248. private:
  249. uint32_t getBottomLevel() {
  250. return bottom_.load(std::memory_order_acquire);
  251. }
  252. /// This function is only called by the destructor
  253. void deleteSharedBuffer() {
  254. DCHECK(PopBatch > 0);
  255. // delete nodes in the buffer
  256. int loc = top_loc_.load(std::memory_order_relaxed);
  257. while (loc >= 0) {
  258. Node* n = shared_buffer_[loc--].pnode.load(std::memory_order_relaxed);
  259. delete n;
  260. }
  261. // delete buffer
  262. shared_buffer_.reset();
  263. }
  264. /// This function is only called by the destructor
  265. void deleteAllNodes(const Position& pos) {
  266. if (getElementSize(pos) == 0) {
  267. // current list is empty, do not need to check
  268. // its children again.
  269. return;
  270. }
  271. Node* curList = getList(pos);
  272. setTreeNode(pos, nullptr);
  273. while (curList != nullptr) { // reclaim nodes
  274. Node* n = curList;
  275. curList = curList->next;
  276. delete n;
  277. }
  278. if (!isLeaf(pos)) {
  279. deleteAllNodes(leftOf(pos));
  280. deleteAllNodes(rightOf(pos));
  281. }
  282. }
  283. /// Check the first node in TreeElement keeps the heap structure.
  284. bool isHeap(const Position& pos) {
  285. if (isLeaf(pos)) {
  286. return true;
  287. }
  288. Position lchild = leftOf(pos);
  289. Position rchild = rightOf(pos);
  290. return isHeap(lchild) && isHeap(rchild) &&
  291. readValue(pos) >= readValue(lchild) &&
  292. readValue(pos) >= readValue(rchild);
  293. }
  294. /// Current position is leaf?
  295. FOLLY_ALWAYS_INLINE bool isLeaf(const Position& pos) {
  296. return pos.level == getBottomLevel();
  297. }
  298. /// Current element is the root?
  299. FOLLY_ALWAYS_INLINE bool isRoot(const Position& pos) {
  300. return pos.level == 0;
  301. }
  302. /// Locate the parent node
  303. FOLLY_ALWAYS_INLINE Position parentOf(const Position& pos) {
  304. Position res;
  305. res.level = pos.level - 1;
  306. res.index = pos.index / 2;
  307. return res;
  308. }
  309. /// Locate the left child
  310. FOLLY_ALWAYS_INLINE Position leftOf(const Position& pos) {
  311. Position res;
  312. res.level = pos.level + 1;
  313. res.index = pos.index * 2;
  314. return res;
  315. }
  316. /// Locate the right child
  317. FOLLY_ALWAYS_INLINE Position rightOf(const Position& pos) {
  318. Position res;
  319. res.level = pos.level + 1;
  320. res.index = pos.index * 2 + 1;
  321. return res;
  322. }
  323. /// get the list size in current MoundElement
  324. FOLLY_ALWAYS_INLINE size_t getElementSize(const Position& pos) {
  325. return levels_[pos.level][pos.index].size.load(std::memory_order_relaxed);
  326. }
  327. /// Set the size of current MoundElement
  328. FOLLY_ALWAYS_INLINE void setElementSize(
  329. const Position& pos,
  330. const uint32_t& v) {
  331. levels_[pos.level][pos.index].size.store(v, std::memory_order_relaxed);
  332. }
  333. /// Extend the tree level
  334. void grow(uint32_t btm) {
  335. while (true) {
  336. if (guard_.fetch_add(1, std::memory_order_acq_rel) == 0) {
  337. break;
  338. }
  339. // someone already expanded the tree
  340. if (btm != getBottomLevel()) {
  341. return;
  342. }
  343. std::this_thread::yield();
  344. }
  345. // double check the bottom has not changed yet
  346. if (btm != getBottomLevel()) {
  347. guard_.store(0, std::memory_order_release);
  348. return;
  349. }
  350. // create and initialize the new level
  351. uint32_t tmp_btm = getBottomLevel();
  352. uint32_t size = 1 << (tmp_btm + 1);
  353. MoundElement* new_level = new MoundElement[size]; // MM
  354. levels_[tmp_btm + 1] = new_level;
  355. bottom_.store(tmp_btm + 1, std::memory_order_acq_rel);
  356. guard_.store(0, std::memory_order_release);
  357. }
  358. /// TODO: optimization
  359. // This function is important, it selects a position to insert the
  360. // node, there are two execution paths when this function returns.
  361. // 1. It returns a position with head node has lower priority than the target.
  362. // Thus it could be potentially used as the starting element to do the binary
  363. // search to find the fit position. (slow path)
  364. // 2. It returns a position, which is not the best fit.
  365. // But it prevents aggressively grow the Mound. (fast path)
  366. Position selectPosition(
  367. const T& val,
  368. bool& path,
  369. uint32_t& seed,
  370. folly::hazptr_holder<Atom>& hptr) {
  371. while (true) {
  372. uint32_t b = getBottomLevel();
  373. int bound = 1 << b; // number of elements in this level
  374. int steps = 1 + b * b; // probe the length
  375. ++seed;
  376. uint32_t index = seed % bound;
  377. for (int i = 0; i < steps; i++) {
  378. int loc = (index + i) % bound;
  379. Position pos;
  380. pos.level = b;
  381. pos.index = loc;
  382. // the first round, we do the quick check
  383. if (optimisticReadValue(pos, hptr) <= val) {
  384. path = false;
  385. seed = ++loc;
  386. return pos;
  387. } else if (
  388. b > LevelForForceInsert && getElementSize(pos) < ListTargetSize) {
  389. // [fast path] conservative implementation
  390. // it makes sure every tree element should
  391. // have more than the given number of nodes.
  392. seed = ++loc;
  393. path = true;
  394. return pos;
  395. }
  396. if (b != getBottomLevel()) {
  397. break;
  398. }
  399. }
  400. // failed too many times grow
  401. if (b == getBottomLevel()) {
  402. grow(b);
  403. }
  404. }
  405. }
  406. /// Swap two Tree Elements (head, size)
  407. void swapList(const Position& a, const Position& b) {
  408. Node* tmp = getList(a);
  409. setTreeNode(a, getList(b));
  410. setTreeNode(b, tmp);
  411. // need to swap the tree node meta-data
  412. uint32_t sa = getElementSize(a);
  413. uint32_t sb = getElementSize(b);
  414. setElementSize(a, sb);
  415. setElementSize(b, sa);
  416. }
  417. FOLLY_ALWAYS_INLINE void lockNode(const Position& pos) {
  418. levels_[pos.level][pos.index].lock.lock();
  419. }
  420. FOLLY_ALWAYS_INLINE void unlockNode(const Position& pos) {
  421. levels_[pos.level][pos.index].lock.unlock();
  422. }
  423. FOLLY_ALWAYS_INLINE bool trylockNode(const Position& pos) {
  424. return levels_[pos.level][pos.index].lock.try_lock();
  425. }
  426. FOLLY_ALWAYS_INLINE T
  427. optimisticReadValue(const Position& pos, folly::hazptr_holder<Atom>& hptr) {
  428. Node* tmp = hptr.get_protected(levels_[pos.level][pos.index].head);
  429. return (tmp == nullptr) ? MIN_VALUE : tmp->val;
  430. }
  431. // Get the value from the head of the list as the elementvalue
  432. FOLLY_ALWAYS_INLINE T readValue(const Position& pos) {
  433. Node* tmp = getList(pos);
  434. return (tmp == nullptr) ? MIN_VALUE : tmp->val;
  435. }
  436. FOLLY_ALWAYS_INLINE Node* getList(const Position& pos) {
  437. return levels_[pos.level][pos.index].head.load(std::memory_order_relaxed);
  438. }
  439. FOLLY_ALWAYS_INLINE void setTreeNode(const Position& pos, Node* t) {
  440. levels_[pos.level][pos.index].head.store(t, std::memory_order_relaxed);
  441. }
  442. // Merge two sorted lists
  443. Node* mergeList(Node* base, Node* source) {
  444. if (base == nullptr) {
  445. return source;
  446. } else if (source == nullptr) {
  447. return base;
  448. }
  449. Node *res, *p;
  450. // choose the head node
  451. if (base->val >= source->val) {
  452. res = base;
  453. base = base->next;
  454. p = res;
  455. } else {
  456. res = source;
  457. source = source->next;
  458. p = res;
  459. }
  460. while (base != nullptr && source != nullptr) {
  461. if (base->val >= source->val) {
  462. p->next = base;
  463. base = base->next;
  464. } else {
  465. p->next = source;
  466. source = source->next;
  467. }
  468. p = p->next;
  469. }
  470. if (base == nullptr) {
  471. p->next = source;
  472. } else {
  473. p->next = base;
  474. }
  475. return res;
  476. }
  477. /// Merge list t to the Element Position
  478. void mergeListTo(const Position& pos, Node* t, const size_t& list_length) {
  479. Node* head = getList(pos);
  480. setTreeNode(pos, mergeList(head, t));
  481. uint32_t ns = getElementSize(pos) + list_length;
  482. setElementSize(pos, ns);
  483. }
  484. bool pruningLeaf(const Position& pos) {
  485. if (getElementSize(pos) <= PruningSize) {
  486. unlockNode(pos);
  487. return true;
  488. }
  489. int b = getBottomLevel();
  490. int leaves = 1 << b;
  491. int cnodes = 0;
  492. for (int i = 0; i < leaves; i++) {
  493. Position tmp;
  494. tmp.level = b;
  495. tmp.index = i;
  496. if (getElementSize(tmp) != 0) {
  497. cnodes++;
  498. }
  499. if (cnodes > leaves * 2 / 3) {
  500. break;
  501. }
  502. }
  503. if (cnodes <= leaves * 2 / 3) {
  504. unlockNode(pos);
  505. return true;
  506. }
  507. return false;
  508. }
  509. /// Split the current list into two lists,
  510. /// then split the tail list and merge to two children.
  511. void startPruning(const Position& pos) {
  512. if (isLeaf(pos) && pruningLeaf(pos)) {
  513. return;
  514. }
  515. // split the list, record the tail
  516. Node* pruning_head = getList(pos);
  517. int steps = ListTargetSize; // keep in the original list
  518. for (int i = 0; i < steps - 1; i++) {
  519. pruning_head = pruning_head->next;
  520. }
  521. Node* t = pruning_head;
  522. pruning_head = pruning_head->next;
  523. t->next = nullptr;
  524. int tail_length = getElementSize(pos) - steps;
  525. setElementSize(pos, steps);
  526. // split the tail list into two lists
  527. // evenly merge to two children
  528. if (pos.level != getBottomLevel()) {
  529. // split the rest into two lists
  530. int left_length = (tail_length + 1) / 2;
  531. int right_length = tail_length - left_length;
  532. Node *to_right, *to_left = pruning_head;
  533. for (int i = 0; i < left_length - 1; i++) {
  534. pruning_head = pruning_head->next;
  535. }
  536. to_right = pruning_head->next;
  537. pruning_head->next = nullptr;
  538. Position lchild = leftOf(pos);
  539. Position rchild = rightOf(pos);
  540. if (left_length != 0) {
  541. lockNode(lchild);
  542. mergeListTo(lchild, to_left, left_length);
  543. }
  544. if (right_length != 0) {
  545. lockNode(rchild);
  546. mergeListTo(rchild, to_right, right_length);
  547. }
  548. unlockNode(pos);
  549. if (left_length != 0 && getElementSize(lchild) > PruningSize) {
  550. startPruning(lchild);
  551. } else if (left_length != 0) {
  552. unlockNode(lchild);
  553. }
  554. if (right_length != 0 && getElementSize(rchild) > PruningSize) {
  555. startPruning(rchild);
  556. } else if (right_length != 0) {
  557. unlockNode(rchild);
  558. }
  559. } else { // time to grow the Mound
  560. grow(pos.level);
  561. // randomly choose a child to insert
  562. if (steps % 2 == 1) {
  563. Position rchild = rightOf(pos);
  564. lockNode(rchild);
  565. mergeListTo(rchild, pruning_head, tail_length);
  566. unlockNode(pos);
  567. unlockNode(rchild);
  568. } else {
  569. Position lchild = leftOf(pos);
  570. lockNode(lchild);
  571. mergeListTo(lchild, pruning_head, tail_length);
  572. unlockNode(pos);
  573. unlockNode(lchild);
  574. }
  575. }
  576. }
  577. // This function insert the new node (always) at the head of the
  578. // current list. It needs to lock the parent & current
  579. // This function may cause the list becoming tooooo long, so we
  580. // provide pruning algorithm.
  581. bool regularInsert(const Position& pos, const T& val, Node* newNode) {
  582. // insert to the root node
  583. if (isRoot(pos)) {
  584. lockNode(pos);
  585. T nv = readValue(pos);
  586. if (LIKELY(nv <= val)) {
  587. newNode->next = getList(pos);
  588. setTreeNode(pos, newNode);
  589. uint32_t sz = getElementSize(pos);
  590. setElementSize(pos, sz + 1);
  591. if (UNLIKELY(sz > PruningSize)) {
  592. startPruning(pos);
  593. } else {
  594. unlockNode(pos);
  595. }
  596. return true;
  597. }
  598. unlockNode(pos);
  599. return false;
  600. }
  601. // insert to an inner node
  602. Position parent = parentOf(pos);
  603. if (!trylockNode(parent)) {
  604. return false;
  605. }
  606. if (!trylockNode(pos)) {
  607. unlockNode(parent);
  608. return false;
  609. }
  610. T pv = readValue(parent);
  611. T nv = readValue(pos);
  612. if (LIKELY(pv > val && nv <= val)) {
  613. // improve the accuracy by getting the node(R) with less priority than the
  614. // new value from parent level, insert the new node to the parent list
  615. // and insert R to the current list.
  616. // It only happens at >= LevelForTraverseParent for reducing contention
  617. uint32_t sz = getElementSize(pos);
  618. if (pos.level >= LevelForTraverseParent) {
  619. Node* start = getList(parent);
  620. while (start->next != nullptr && start->next->val >= val) {
  621. start = start->next;
  622. }
  623. if (start->next != nullptr) {
  624. newNode->next = start->next;
  625. start->next = newNode;
  626. while (start->next->next != nullptr) {
  627. start = start->next;
  628. }
  629. newNode = start->next;
  630. start->next = nullptr;
  631. }
  632. unlockNode(parent);
  633. Node* curList = getList(pos);
  634. if (curList == nullptr) {
  635. newNode->next = nullptr;
  636. setTreeNode(pos, newNode);
  637. } else {
  638. Node* p = curList;
  639. if (p->val <= newNode->val) {
  640. newNode->next = curList;
  641. setTreeNode(pos, newNode);
  642. } else {
  643. while (p->next != nullptr && p->next->val >= newNode->val) {
  644. p = p->next;
  645. }
  646. newNode->next = p->next;
  647. p->next = newNode;
  648. }
  649. }
  650. setElementSize(pos, sz + 1);
  651. } else {
  652. unlockNode(parent);
  653. newNode->next = getList(pos);
  654. setTreeNode(pos, newNode);
  655. setElementSize(pos, sz + 1);
  656. }
  657. if (UNLIKELY(sz > PruningSize)) {
  658. startPruning(pos);
  659. } else {
  660. unlockNode(pos);
  661. }
  662. return true;
  663. }
  664. unlockNode(parent);
  665. unlockNode(pos);
  666. return false;
  667. }
  668. bool forceInsertToRoot(Node* newNode) {
  669. Position pos;
  670. pos.level = pos.index = 0;
  671. std::unique_lock<Mutex> lck(
  672. levels_[pos.level][pos.index].lock, std::try_to_lock);
  673. if (!lck.owns_lock()) {
  674. return false;
  675. }
  676. uint32_t sz = getElementSize(pos);
  677. if (sz >= ListTargetSize) {
  678. return false;
  679. }
  680. Node* curList = getList(pos);
  681. if (curList == nullptr) {
  682. newNode->next = nullptr;
  683. setTreeNode(pos, newNode);
  684. } else {
  685. Node* p = curList;
  686. if (p->val <= newNode->val) {
  687. newNode->next = curList;
  688. setTreeNode(pos, newNode);
  689. } else {
  690. while (p->next != nullptr && p->next->val >= newNode->val) {
  691. p = p->next;
  692. }
  693. newNode->next = p->next;
  694. p->next = newNode;
  695. }
  696. }
  697. setElementSize(pos, sz + 1);
  698. return true;
  699. }
  700. // This function forces the new node inserting to the current position
  701. // if the element does not hold the enough nodes. It is safe to
  702. // lock just one position to insert, because it won't be the first
  703. // node to sustain the heap structure.
  704. bool forceInsert(const Position& pos, const T& val, Node* newNode) {
  705. if (isRoot(pos)) {
  706. return forceInsertToRoot(newNode);
  707. }
  708. while (true) {
  709. std::unique_lock<Mutex> lck(
  710. levels_[pos.level][pos.index].lock, std::try_to_lock);
  711. if (!lck.owns_lock()) {
  712. if (getElementSize(pos) < ListTargetSize && readValue(pos) >= val) {
  713. continue;
  714. } else {
  715. return false;
  716. }
  717. }
  718. T nv = readValue(pos);
  719. uint32_t sz = getElementSize(pos);
  720. // do not allow the new node to be the first one
  721. // do not allow the list size tooooo big
  722. if (UNLIKELY(nv < val || sz >= ListTargetSize)) {
  723. return false;
  724. }
  725. Node* p = getList(pos);
  726. // find a place to insert the node
  727. while (p->next != nullptr && p->next->val > val) {
  728. p = p->next;
  729. }
  730. newNode->next = p->next;
  731. p->next = newNode;
  732. // do not forget to change the metadata
  733. setElementSize(pos, sz + 1);
  734. return true;
  735. }
  736. }
  737. void binarySearchPosition(
  738. Position& cur,
  739. const T& val,
  740. folly::hazptr_holder<Atom>& hptr) {
  741. Position parent, mid;
  742. if (cur.level == 0) {
  743. return;
  744. }
  745. // start from the root
  746. parent.level = parent.index = 0;
  747. while (true) { // binary search
  748. mid.level = (cur.level + parent.level) / 2;
  749. mid.index = cur.index >> (cur.level - mid.level);
  750. T mv = optimisticReadValue(mid, hptr);
  751. if (val < mv) {
  752. parent = mid;
  753. } else {
  754. cur = mid;
  755. }
  756. if (mid.level == 0 || // the root
  757. ((parent.level + 1 == cur.level) && parent.level != 0)) {
  758. return;
  759. }
  760. }
  761. }
  762. // The push keeps the length of each element stable
  763. void moundPush(const T& val) {
  764. Position cur;
  765. folly::hazptr_holder<Atom> hptr;
  766. Node* newNode = new Node;
  767. newNode->val = val;
  768. uint32_t seed = folly::Random::rand32() % (1 << 21);
  769. while (true) {
  770. // shell we go the fast path?
  771. bool go_fast_path = false;
  772. // chooice the right node to start
  773. cur = selectPosition(val, go_fast_path, seed, hptr);
  774. if (go_fast_path) {
  775. if (LIKELY(forceInsert(cur, val, newNode))) {
  776. if (MayBlock) {
  777. blockingPushImpl();
  778. }
  779. return;
  780. } else {
  781. continue;
  782. }
  783. }
  784. binarySearchPosition(cur, val, hptr);
  785. if (LIKELY(regularInsert(cur, val, newNode))) {
  786. if (MayBlock) {
  787. blockingPushImpl();
  788. }
  789. return;
  790. }
  791. }
  792. }
  793. int popToSharedBuffer(const uint32_t rsize, Node* head) {
  794. Position pos;
  795. pos.level = pos.index = 0;
  796. int num = std::min(rsize, (uint32_t)PopBatch);
  797. for (int i = num - 1; i >= 0; i--) {
  798. // wait until this block is empty
  799. while (shared_buffer_[i].pnode.load(std::memory_order_relaxed) != nullptr)
  800. ;
  801. shared_buffer_[i].pnode.store(head, std::memory_order_relaxed);
  802. head = head->next;
  803. }
  804. if (num > 0) {
  805. top_loc_.store(num - 1, std::memory_order_acq_rel);
  806. }
  807. setTreeNode(pos, head);
  808. return rsize - num;
  809. }
  810. void mergeDown(const Position& pos) {
  811. if (isLeaf(pos)) {
  812. unlockNode(pos);
  813. return;
  814. }
  815. // acquire locks for L and R and compare
  816. Position lchild = leftOf(pos);
  817. Position rchild = rightOf(pos);
  818. lockNode(lchild);
  819. lockNode(rchild);
  820. // read values
  821. T nv = readValue(pos);
  822. T lv = readValue(lchild);
  823. T rv = readValue(rchild);
  824. if (nv >= lv && nv >= rv) {
  825. unlockNode(pos);
  826. unlockNode(lchild);
  827. unlockNode(rchild);
  828. return;
  829. }
  830. // If two children contains nodes less than the
  831. // threshold, we merge two children to the parent
  832. // and do merge down on both of them.
  833. size_t sum =
  834. getElementSize(rchild) + getElementSize(lchild) + getElementSize(pos);
  835. if (sum <= MergingSize) {
  836. Node* l1 = mergeList(getList(rchild), getList(lchild));
  837. setTreeNode(pos, mergeList(l1, getList(pos)));
  838. setElementSize(pos, sum);
  839. setTreeNode(lchild, nullptr);
  840. setElementSize(lchild, 0);
  841. setTreeNode(rchild, nullptr);
  842. setElementSize(rchild, 0);
  843. unlockNode(pos);
  844. mergeDown(lchild);
  845. mergeDown(rchild);
  846. return;
  847. }
  848. // pull from right
  849. if (rv >= lv && rv > nv) {
  850. swapList(rchild, pos);
  851. unlockNode(pos);
  852. unlockNode(lchild);
  853. mergeDown(rchild);
  854. } else if (lv >= rv && lv > nv) {
  855. // pull from left
  856. swapList(lchild, pos);
  857. unlockNode(pos);
  858. unlockNode(rchild);
  859. mergeDown(lchild);
  860. }
  861. }
  862. bool deferSettingRootSize(Position& pos) {
  863. if (isLeaf(pos)) {
  864. setElementSize(pos, 0);
  865. unlockNode(pos);
  866. return true;
  867. }
  868. // acquire locks for L and R and compare
  869. Position lchild = leftOf(pos);
  870. Position rchild = rightOf(pos);
  871. lockNode(lchild);
  872. lockNode(rchild);
  873. if (getElementSize(lchild) == 0 && getElementSize(rchild) == 0) {
  874. setElementSize(pos, 0);
  875. unlockNode(pos);
  876. unlockNode(lchild);
  877. unlockNode(rchild);
  878. return true;
  879. } else {
  880. // read values
  881. T lv = readValue(lchild);
  882. T rv = readValue(rchild);
  883. if (lv >= rv) {
  884. swapList(lchild, pos);
  885. setElementSize(lchild, 0);
  886. unlockNode(pos);
  887. unlockNode(rchild);
  888. pos = lchild;
  889. } else {
  890. swapList(rchild, pos);
  891. setElementSize(rchild, 0);
  892. unlockNode(pos);
  893. unlockNode(lchild);
  894. pos = rchild;
  895. }
  896. return false;
  897. }
  898. }
  899. bool moundPopMany(T& val) {
  900. // pop from the root
  901. Position pos;
  902. pos.level = pos.index = 0;
  903. // the root is nullptr, return false
  904. Node* head = getList(pos);
  905. if (head == nullptr) {
  906. unlockNode(pos);
  907. return false;
  908. }
  909. // shared buffer already filled by other threads
  910. if (PopBatch > 0 && top_loc_.load(std::memory_order_acquire) >= 0) {
  911. unlockNode(pos);
  912. return false;
  913. }
  914. uint32_t sz = getElementSize(pos);
  915. // get the one node first
  916. val = head->val;
  917. Node* p = head;
  918. head = head->next;
  919. sz--;
  920. if (PopBatch > 0) {
  921. sz = popToSharedBuffer(sz, head);
  922. } else {
  923. setTreeNode(pos, head);
  924. }
  925. bool done = false;
  926. if (LIKELY(sz == 0)) {
  927. done = deferSettingRootSize(pos);
  928. } else {
  929. setElementSize(pos, sz);
  930. }
  931. if (LIKELY(!done)) {
  932. mergeDown(pos);
  933. }
  934. p->retire();
  935. return true;
  936. }
  937. void blockingPushImpl() {
  938. auto p = pticket_.fetch_add(1, std::memory_order_acq_rel);
  939. auto loc = getFutexArrayLoc(p);
  940. uint32_t curfutex = futex_array_[loc].load(std::memory_order_acquire);
  941. while (true) {
  942. uint32_t ready = p << 1; // get the lower 31 bits
  943. // avoid the situation that push has larger ticket already set the value
  944. if (UNLIKELY(
  945. ready + 1 < curfutex ||
  946. ((curfutex > ready) && (curfutex - ready > 0x40000000)))) {
  947. return;
  948. }
  949. if (futex_array_[loc].compare_exchange_strong(curfutex, ready)) {
  950. if (curfutex &
  951. 1) { // One or more consumers may be blocked on this futex
  952. detail::futexWake(&futex_array_[loc]);
  953. }
  954. return;
  955. } else {
  956. curfutex = futex_array_[loc].load(std::memory_order_acquire);
  957. }
  958. }
  959. }
  960. // This could guarentee the Mound is empty
  961. FOLLY_ALWAYS_INLINE bool isMoundEmpty() {
  962. Position pos;
  963. pos.level = pos.index = 0;
  964. return getElementSize(pos) == 0;
  965. }
  966. // Return true if the shared buffer is empty
  967. FOLLY_ALWAYS_INLINE bool isSharedBufferEmpty() {
  968. return top_loc_.load(std::memory_order_acquire) < 0;
  969. }
  970. FOLLY_ALWAYS_INLINE bool isEmpty() {
  971. if (PopBatch > 0) {
  972. return isMoundEmpty() && isSharedBufferEmpty();
  973. }
  974. return isMoundEmpty();
  975. }
  976. FOLLY_ALWAYS_INLINE bool futexIsReady(const size_t& curticket) {
  977. auto loc = getFutexArrayLoc(curticket);
  978. auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
  979. uint32_t short_cticket = curticket & 0x7FFFFFFF;
  980. uint32_t futex_ready = curfutex >> 1;
  981. // handle unsigned 31 bits overflow
  982. return futex_ready >= short_cticket ||
  983. short_cticket - futex_ready > 0x40000000;
  984. }
  985. template <typename Clock, typename Duration>
  986. FOLLY_NOINLINE bool trySpinBeforeBlock(
  987. const size_t& curticket,
  988. const std::chrono::time_point<Clock, Duration>& deadline,
  989. const folly::WaitOptions& opt = wait_options()) {
  990. return folly::detail::spin_pause_until(deadline, opt, [=] {
  991. return futexIsReady(curticket);
  992. }) == folly::detail::spin_result::success;
  993. }
  994. void tryBlockingPop(const size_t& curticket) {
  995. auto loc = getFutexArrayLoc(curticket);
  996. auto curfutex = futex_array_[loc].load(std::memory_order_acquire);
  997. if (curfutex &
  998. 1) { /// The last round consumers are still waiting, go to sleep
  999. detail::futexWait(&futex_array_[loc], curfutex);
  1000. }
  1001. if (trySpinBeforeBlock(
  1002. curticket,
  1003. std::chrono::time_point<std::chrono::steady_clock>::max())) {
  1004. return; /// Spin until the push ticket is ready
  1005. }
  1006. while (true) {
  1007. curfutex = futex_array_[loc].load(std::memory_order_acquire);
  1008. if (curfutex &
  1009. 1) { /// The last round consumers are still waiting, go to sleep
  1010. detail::futexWait(&futex_array_[loc], curfutex);
  1011. } else if (!futexIsReady(curticket)) { // current ticket < pop ticket
  1012. uint32_t blocking_futex = curfutex + 1;
  1013. if (futex_array_[loc].compare_exchange_strong(
  1014. curfutex, blocking_futex)) {
  1015. detail::futexWait(&futex_array_[loc], blocking_futex);
  1016. }
  1017. } else {
  1018. return;
  1019. }
  1020. }
  1021. }
  1022. void blockingPopImpl() {
  1023. auto ct = cticket_.fetch_add(1, std::memory_order_acq_rel);
  1024. // fast path check
  1025. if (futexIsReady(ct)) {
  1026. return;
  1027. }
  1028. // Blocking
  1029. tryBlockingPop(ct);
  1030. }
  1031. bool tryPopFromMound(T& val) {
  1032. if (isMoundEmpty()) {
  1033. return false;
  1034. }
  1035. Position pos;
  1036. pos.level = pos.index = 0;
  1037. // lock the root
  1038. if (trylockNode(pos)) {
  1039. return moundPopMany(val);
  1040. }
  1041. return false;
  1042. }
  1043. FOLLY_ALWAYS_INLINE static folly::WaitOptions wait_options() {
  1044. return {};
  1045. }
  1046. template <typename Clock, typename Duration>
  1047. FOLLY_NOINLINE bool tryWait(
  1048. const std::chrono::time_point<Clock, Duration>& deadline,
  1049. const folly::WaitOptions& opt = wait_options()) {
  1050. // Fast path, by quick check the status
  1051. switch (folly::detail::spin_pause_until(
  1052. deadline, opt, [=] { return !isEmpty(); })) {
  1053. case folly::detail::spin_result::success:
  1054. return true;
  1055. case folly::detail::spin_result::timeout:
  1056. return false;
  1057. case folly::detail::spin_result::advance:
  1058. break;
  1059. }
  1060. // Spinning strategy
  1061. while (true) {
  1062. auto res =
  1063. folly::detail::spin_yield_until(deadline, [=] { return !isEmpty(); });
  1064. if (res == folly::detail::spin_result::success) {
  1065. return true;
  1066. } else if (res == folly::detail::spin_result::timeout) {
  1067. return false;
  1068. }
  1069. }
  1070. return true;
  1071. }
  1072. bool tryPopFromSharedBuffer(T& val) {
  1073. int get_or = -1;
  1074. if (!isSharedBufferEmpty()) {
  1075. get_or = top_loc_.fetch_sub(1, std::memory_order_acq_rel);
  1076. if (get_or >= 0) {
  1077. Node* c = shared_buffer_[get_or].pnode.load(std::memory_order_relaxed);
  1078. shared_buffer_[get_or].pnode.store(nullptr, std::memory_order_release);
  1079. val = c->val;
  1080. c->retire();
  1081. return true;
  1082. }
  1083. }
  1084. return false;
  1085. }
  1086. size_t getFutexArrayLoc(size_t s) {
  1087. return ((s - 1) * Stride) & (NumFutex - 1);
  1088. }
  1089. void moundPop(T& val) {
  1090. if (MayBlock) {
  1091. blockingPopImpl();
  1092. }
  1093. if (PopBatch > 0) {
  1094. if (tryPopFromSharedBuffer(val)) {
  1095. return;
  1096. }
  1097. }
  1098. while (true) {
  1099. if (LIKELY(tryPopFromMound(val))) {
  1100. return;
  1101. }
  1102. tryWait(std::chrono::time_point<std::chrono::steady_clock>::max());
  1103. if (PopBatch > 0 && tryPopFromSharedBuffer(val)) {
  1104. return;
  1105. }
  1106. }
  1107. }
  1108. };
  1109. } // namespace folly