PriorityMPMCQueue.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. /*
  2. * Copyright 2017-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <glog/logging.h>
  18. #include <algorithm>
  19. #include <vector>
  20. #include <folly/MPMCQueue.h>
  21. namespace folly {
  22. /// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities
  23. /// by managing multiple underlying MPMCQueues. As of now, this does
  24. /// not implement a blocking interface. For the purposes of this
  25. /// class, lower number is higher priority
  26. template <
  27. typename T,
  28. template <typename> class Atom = std::atomic,
  29. bool Dynamic = false>
  30. class PriorityMPMCQueue {
  31. public:
  32. PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
  33. CHECK_GT(numPriorities, 0);
  34. queues_.reserve(numPriorities);
  35. for (size_t i = 0; i < numPriorities; i++) {
  36. queues_.emplace_back(capacity);
  37. }
  38. }
  39. size_t getNumPriorities() {
  40. return queues_.size();
  41. }
  42. // Add at medium priority by default
  43. bool write(T&& item) {
  44. return writeWithPriority(std::move(item), getNumPriorities() / 2);
  45. }
  46. bool writeWithPriority(T&& item, size_t priority) {
  47. size_t queue = std::min(getNumPriorities() - 1, priority);
  48. CHECK_LT(queue, queues_.size());
  49. return queues_.at(queue).write(std::move(item));
  50. }
  51. bool writeWithPriority(
  52. T&& item,
  53. size_t priority,
  54. std::chrono::milliseconds timeout) {
  55. size_t queue = std::min(getNumPriorities() - 1, priority);
  56. CHECK_LT(queue, queues_.size());
  57. return queues_.at(queue).tryWriteUntil(
  58. std::chrono::steady_clock::now() + timeout, std::move(item));
  59. }
  60. bool read(T& item) {
  61. for (auto& q : queues_) {
  62. if (q.readIfNotEmpty(item)) {
  63. return true;
  64. }
  65. }
  66. return false;
  67. }
  68. bool readWithPriority(T& item, size_t priority) {
  69. return queues_[priority].readIfNotEmpty(item);
  70. }
  71. size_t size() const {
  72. size_t total_size = 0;
  73. for (auto& q : queues_) {
  74. // MPMCQueue can have a negative size if there are pending readers.
  75. // Since we don't expose a blocking interface this shouldn't happen,
  76. // But just in case we put a floor at 0
  77. total_size += std::max<ssize_t>(0, q.size());
  78. }
  79. return total_size;
  80. }
  81. size_t sizeGuess() const {
  82. size_t total_size = 0;
  83. for (auto& q : queues_) {
  84. // MPMCQueue can have a negative size if there are pending readers.
  85. // Since we don't expose a blocking interface this shouldn't happen,
  86. // But just in case we put a floor at 0
  87. total_size += std::max<ssize_t>(0, q.sizeGuess());
  88. }
  89. return total_size;
  90. }
  91. /// Returns true if there are no items available for dequeue
  92. bool isEmpty() const {
  93. return size() == 0;
  94. }
  95. private:
  96. std::vector<folly::MPMCQueue<T, Atom, Dynamic>> queues_;
  97. };
  98. } // namespace folly