FlatCombiningPriorityQueue.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. /*
  2. * Copyright 2017-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <atomic>
  18. #include <chrono>
  19. #include <memory>
  20. #include <mutex>
  21. #include <queue>
  22. #include <folly/Optional.h>
  23. #include <folly/detail/Futex.h>
  24. #include <folly/experimental/flat_combining/FlatCombining.h>
  25. #include <glog/logging.h>
  26. namespace folly {
  27. /// Thread-safe priority queue based on flat combining. If the
  28. /// constructor parameter maxSize is greater than 0 (default = 0),
  29. /// then the queue is bounded. This template provides blocking,
  30. /// non-blocking, and timed variants of each of push(), pop(), and
  31. /// peek() operations. The empty() and size() functions are inherently
  32. /// non-blocking.
  33. ///
  34. /// PriorityQueue must support the interface of std::priority_queue,
  35. /// specifically empty(), size(), push(), top(), and pop(). Mutex
  36. /// must meet the standard Lockable requirements.
  37. ///
  38. /// By default FlatCombining uses a dedicated combiner thread, which
  39. /// yields better latency and throughput under high contention but
  40. /// higher overheads under low contention. If the constructor
  41. /// parameter dedicated is false, then there will be no dedicated
  42. /// combiner thread and any requester may do combining of operations
  43. /// requested by other threads. For more details see the comments for
  44. /// FlatCombining.
  45. ///
  46. /// Usage examples:
  47. /// @code
  48. /// FlatCombiningPriorityQueue<int> pq(1);
  49. /// CHECK(pq.empty());
  50. /// CHECK(pq.size() == 0);
  51. /// int v;
  52. /// CHECK(!try_pop(v));
  53. /// CHECK(!try_pop_until(v, now() + seconds(1)));
  54. /// CHECK(!try_peek(v));
  55. /// CHECK(!try_peek_until(v, now() + seconds(1)));
  56. /// pq.push(10);
  57. /// CHECK(!pq.empty());
  58. /// CHECK(pq.size() == 1);
  59. /// CHECK(!pq.try_push(20));
  60. /// CHECK(!pq.try_push_until(20), now() + seconds(1)));
  61. /// peek(v);
  62. /// CHECK_EQ(v, 10);
  63. /// CHECK(pq.size() == 1);
  64. /// pop(v);
  65. /// CHECK_EQ(v, 10);
  66. /// CHECK(pq.empty());
  67. /// @encode
  68. template <
  69. typename T,
  70. typename PriorityQueue = std::priority_queue<T>,
  71. typename Mutex = std::mutex,
  72. template <typename> class Atom = std::atomic>
  73. class FlatCombiningPriorityQueue
  74. : public folly::FlatCombining<
  75. FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>,
  76. Mutex,
  77. Atom> {
  78. using FCPQ = FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>;
  79. using FC = folly::FlatCombining<FCPQ, Mutex, Atom>;
  80. public:
  81. template <
  82. typename... PQArgs,
  83. typename = decltype(PriorityQueue(std::declval<PQArgs>()...))>
  84. explicit FlatCombiningPriorityQueue(
  85. // Concurrent priority queue parameter
  86. const size_t maxSize = 0,
  87. // Flat combining parameters
  88. const bool dedicated = true,
  89. const uint32_t numRecs = 0,
  90. const uint32_t maxOps = 0,
  91. // (Sequential) PriorityQueue Parameters
  92. PQArgs... args)
  93. : FC(dedicated, numRecs, maxOps),
  94. maxSize_(maxSize),
  95. pq_(std::forward<PQArgs>(args)...) {}
  96. /// Returns true iff the priority queue is empty
  97. bool empty() const {
  98. bool res;
  99. auto fn = [&] { res = pq_.empty(); };
  100. const_cast<FCPQ*>(this)->requestFC(fn);
  101. return res;
  102. }
  103. /// Returns the number of items in the priority queue
  104. size_t size() const {
  105. size_t res;
  106. auto fn = [&] { res = pq_.size(); };
  107. const_cast<FCPQ*>(this)->requestFC(fn);
  108. return res;
  109. }
  110. /// Non-blocking push. Succeeds if there is space in the priority
  111. /// queue to insert the new item. Tries once if no time point is
  112. /// provided or until the provided time_point is reached. If
  113. /// successful, inserts the provided item in the priority queue
  114. /// according to its priority.
  115. bool try_push(const T& val) {
  116. return try_push_impl(
  117. val, std::chrono::time_point<std::chrono::steady_clock>::min());
  118. }
  119. /// Non-blocking pop. Succeeds if the priority queue is
  120. /// nonempty. Tries once if no time point is provided or until the
  121. /// provided time_point is reached. If successful, copies the
  122. /// highest priority item and removes it from the priority queue.
  123. bool try_pop(T& val) {
  124. return try_pop_impl(
  125. val, std::chrono::time_point<std::chrono::steady_clock>::min());
  126. }
  127. /// Non-blocking peek. Succeeds if the priority queue is
  128. /// nonempty. Tries once if no time point is provided or until the
  129. /// provided time_point is reached. If successful, copies the
  130. /// highest priority item without removing it.
  131. bool try_peek(T& val) {
  132. return try_peek_impl(
  133. val, std::chrono::time_point<std::chrono::steady_clock>::min());
  134. }
  135. /// Blocking push. Inserts the provided item in the priority
  136. /// queue. If it is full, this function blocks until there is space
  137. /// for the new item.
  138. void push(const T& val) {
  139. try_push_impl(
  140. val, std::chrono::time_point<std::chrono::steady_clock>::max());
  141. }
  142. /// Blocking pop. Copies the highest priority item and removes
  143. /// it. If the priority queue is empty, this function blocks until
  144. /// it is nonempty.
  145. void pop(T& val) {
  146. try_pop_impl(
  147. val, std::chrono::time_point<std::chrono::steady_clock>::max());
  148. }
  149. /// Blocking peek. Copies the highest priority item without
  150. /// removing it. If the priority queue is empty, this function
  151. /// blocks until it is nonempty.
  152. void peek(T& val) {
  153. try_peek_impl(
  154. val, std::chrono::time_point<std::chrono::steady_clock>::max());
  155. }
  156. folly::Optional<T> try_pop() {
  157. T val;
  158. if (try_pop(val)) {
  159. return std::move(val);
  160. }
  161. return folly::none;
  162. }
  163. folly::Optional<T> try_peek() {
  164. T val;
  165. if (try_peek(val)) {
  166. return std::move(val);
  167. }
  168. return folly::none;
  169. }
  170. template <typename Rep, typename Period>
  171. folly::Optional<T> try_pop_for(
  172. const std::chrono::duration<Rep, Period>& timeout) {
  173. T val;
  174. if (try_pop(val) ||
  175. try_pop_impl(val, std::chrono::steady_clock::now() + timeout)) {
  176. return std::move(val);
  177. }
  178. return folly::none;
  179. }
  180. template <typename Rep, typename Period>
  181. bool try_push_for(
  182. const T& val,
  183. const std::chrono::duration<Rep, Period>& timeout) {
  184. return (
  185. try_push(val) ||
  186. try_push_impl(val, std::chrono::steady_clock::now() + timeout));
  187. }
  188. template <typename Rep, typename Period>
  189. folly::Optional<T> try_peek_for(
  190. const std::chrono::duration<Rep, Period>& timeout) {
  191. T val;
  192. if (try_peek(val) ||
  193. try_peek_impl(val, std::chrono::steady_clock::now() + timeout)) {
  194. return std::move(val);
  195. }
  196. return folly::none;
  197. }
  198. template <typename Clock, typename Duration>
  199. folly::Optional<T> try_pop_until(
  200. const std::chrono::time_point<Clock, Duration>& deadline) {
  201. T val;
  202. if (try_pop_impl(val, deadline)) {
  203. return std::move(val);
  204. }
  205. return folly::none;
  206. }
  207. template <typename Clock, typename Duration>
  208. bool try_push_until(
  209. const T& val,
  210. const std::chrono::time_point<Clock, Duration>& deadline) {
  211. return try_push_impl(val, deadline);
  212. }
  213. template <typename Clock, typename Duration>
  214. folly::Optional<T> try_peek_until(
  215. const std::chrono::time_point<Clock, Duration>& deadline) {
  216. T val;
  217. if (try_peek_impl(val, deadline)) {
  218. return std::move(val);
  219. }
  220. return folly::none;
  221. }
  222. private:
  223. size_t maxSize_;
  224. PriorityQueue pq_;
  225. detail::Futex<Atom> empty_{};
  226. detail::Futex<Atom> full_{};
  227. bool isTrue(detail::Futex<Atom>& futex) {
  228. return futex.load(std::memory_order_relaxed) != 0;
  229. }
  230. void setFutex(detail::Futex<Atom>& futex, uint32_t val) {
  231. futex.store(val, std::memory_order_relaxed);
  232. }
  233. bool futexSignal(detail::Futex<Atom>& futex) {
  234. if (isTrue(futex)) {
  235. setFutex(futex, 0);
  236. return true;
  237. } else {
  238. return false;
  239. }
  240. }
  241. template <typename Clock, typename Duration>
  242. bool try_push_impl(
  243. const T& val,
  244. const std::chrono::time_point<Clock, Duration>& when);
  245. template <typename Clock, typename Duration>
  246. bool try_pop_impl(
  247. T& val,
  248. const std::chrono::time_point<Clock, Duration>& when);
  249. template <typename Clock, typename Duration>
  250. bool try_peek_impl(
  251. T& val,
  252. const std::chrono::time_point<Clock, Duration>& when);
  253. };
  254. /// Implementation
  255. template <
  256. typename T,
  257. typename PriorityQueue,
  258. typename Mutex,
  259. template <typename> class Atom>
  260. template <typename Clock, typename Duration>
  261. inline bool
  262. FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_push_impl(
  263. const T& val,
  264. const std::chrono::time_point<Clock, Duration>& when) {
  265. while (true) {
  266. bool res;
  267. bool wake;
  268. auto fn = [&] {
  269. if (maxSize_ > 0 && pq_.size() == maxSize_) {
  270. setFutex(full_, 1);
  271. res = false;
  272. return;
  273. }
  274. DCHECK(maxSize_ == 0 || pq_.size() < maxSize_);
  275. try {
  276. pq_.push(val);
  277. wake = futexSignal(empty_);
  278. res = true;
  279. return;
  280. } catch (const std::bad_alloc&) {
  281. setFutex(full_, 1);
  282. res = false;
  283. return;
  284. }
  285. };
  286. this->requestFC(fn);
  287. if (res) {
  288. if (wake) {
  289. detail::futexWake(&empty_);
  290. }
  291. return true;
  292. }
  293. if (when == std::chrono::time_point<Clock>::min()) {
  294. return false;
  295. }
  296. while (isTrue(full_)) {
  297. if (when == std::chrono::time_point<Clock>::max()) {
  298. detail::futexWait(&full_, 1);
  299. } else {
  300. if (Clock::now() > when) {
  301. return false;
  302. } else {
  303. detail::futexWaitUntil(&full_, 1, when);
  304. }
  305. }
  306. } // inner while loop
  307. } // outer while loop
  308. }
  309. template <
  310. typename T,
  311. typename PriorityQueue,
  312. typename Mutex,
  313. template <typename> class Atom>
  314. template <typename Clock, typename Duration>
  315. inline bool
  316. FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_pop_impl(
  317. T& val,
  318. const std::chrono::time_point<Clock, Duration>& when) {
  319. while (true) {
  320. bool res;
  321. bool wake;
  322. auto fn = [&] {
  323. res = !pq_.empty();
  324. if (res) {
  325. val = pq_.top();
  326. pq_.pop();
  327. wake = futexSignal(full_);
  328. } else {
  329. setFutex(empty_, 1);
  330. }
  331. };
  332. this->requestFC(fn);
  333. if (res) {
  334. if (wake) {
  335. detail::futexWake(&full_);
  336. }
  337. return true;
  338. }
  339. while (isTrue(empty_)) {
  340. if (when == std::chrono::time_point<Clock>::max()) {
  341. detail::futexWait(&empty_, 1);
  342. } else {
  343. if (Clock::now() > when) {
  344. return false;
  345. } else {
  346. detail::futexWaitUntil(&empty_, 1, when);
  347. }
  348. }
  349. } // inner while loop
  350. } // outer while loop
  351. }
  352. template <
  353. typename T,
  354. typename PriorityQueue,
  355. typename Mutex,
  356. template <typename> class Atom>
  357. template <typename Clock, typename Duration>
  358. inline bool
  359. FlatCombiningPriorityQueue<T, PriorityQueue, Mutex, Atom>::try_peek_impl(
  360. T& val,
  361. const std::chrono::time_point<Clock, Duration>& when) {
  362. while (true) {
  363. bool res;
  364. auto fn = [&] {
  365. res = !pq_.empty();
  366. if (res) {
  367. val = pq_.top();
  368. } else {
  369. setFutex(empty_, 1);
  370. }
  371. };
  372. this->requestFC(fn);
  373. if (res) {
  374. return true;
  375. }
  376. while (isTrue(empty_)) {
  377. if (when == std::chrono::time_point<Clock>::max()) {
  378. detail::futexWait(&empty_, 1);
  379. } else {
  380. if (Clock::now() > when) {
  381. return false;
  382. } else {
  383. detail::futexWaitUntil(&empty_, 1, when);
  384. }
  385. }
  386. } // inner while loop
  387. } // outer while loop
  388. }
  389. } // namespace folly