ProducerConsumerQueue.h 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. * Copyright 2012-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. // @author Bo Hu (bhu@fb.com)
  17. // @author Jordan DeLong (delong.j@fb.com)
  18. #pragma once
  19. #include <atomic>
  20. #include <cassert>
  21. #include <cstdlib>
  22. #include <memory>
  23. #include <stdexcept>
  24. #include <type_traits>
  25. #include <utility>
  26. #include <folly/concurrency/CacheLocality.h>
  27. namespace folly {
  28. /*
  29. * ProducerConsumerQueue is a one producer and one consumer queue
  30. * without locks.
  31. */
  32. template <class T>
  33. struct ProducerConsumerQueue {
  34. typedef T value_type;
  35. ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
  36. ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
  37. // size must be >= 2.
  38. //
  39. // Also, note that the number of usable slots in the queue at any
  40. // given time is actually (size-1), so if you start with an empty queue,
  41. // isFull() will return true after size-1 insertions.
  42. explicit ProducerConsumerQueue(uint32_t size)
  43. : size_(size),
  44. records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
  45. readIndex_(0),
  46. writeIndex_(0) {
  47. assert(size >= 2);
  48. if (!records_) {
  49. throw std::bad_alloc();
  50. }
  51. }
  52. ~ProducerConsumerQueue() {
  53. // We need to destruct anything that may still exist in our queue.
  54. // (No real synchronization needed at destructor time: only one
  55. // thread can be doing this.)
  56. if (!std::is_trivially_destructible<T>::value) {
  57. size_t readIndex = readIndex_;
  58. size_t endIndex = writeIndex_;
  59. while (readIndex != endIndex) {
  60. records_[readIndex].~T();
  61. if (++readIndex == size_) {
  62. readIndex = 0;
  63. }
  64. }
  65. }
  66. std::free(records_);
  67. }
  68. template <class... Args>
  69. bool write(Args&&... recordArgs) {
  70. auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
  71. auto nextRecord = currentWrite + 1;
  72. if (nextRecord == size_) {
  73. nextRecord = 0;
  74. }
  75. if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
  76. new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
  77. writeIndex_.store(nextRecord, std::memory_order_release);
  78. return true;
  79. }
  80. // queue is full
  81. return false;
  82. }
  83. // move (or copy) the value at the front of the queue to given variable
  84. bool read(T& record) {
  85. auto const currentRead = readIndex_.load(std::memory_order_relaxed);
  86. if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
  87. // queue is empty
  88. return false;
  89. }
  90. auto nextRecord = currentRead + 1;
  91. if (nextRecord == size_) {
  92. nextRecord = 0;
  93. }
  94. record = std::move(records_[currentRead]);
  95. records_[currentRead].~T();
  96. readIndex_.store(nextRecord, std::memory_order_release);
  97. return true;
  98. }
  99. // pointer to the value at the front of the queue (for use in-place) or
  100. // nullptr if empty.
  101. T* frontPtr() {
  102. auto const currentRead = readIndex_.load(std::memory_order_relaxed);
  103. if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
  104. // queue is empty
  105. return nullptr;
  106. }
  107. return &records_[currentRead];
  108. }
  109. // queue must not be empty
  110. void popFront() {
  111. auto const currentRead = readIndex_.load(std::memory_order_relaxed);
  112. assert(currentRead != writeIndex_.load(std::memory_order_acquire));
  113. auto nextRecord = currentRead + 1;
  114. if (nextRecord == size_) {
  115. nextRecord = 0;
  116. }
  117. records_[currentRead].~T();
  118. readIndex_.store(nextRecord, std::memory_order_release);
  119. }
  120. bool isEmpty() const {
  121. return readIndex_.load(std::memory_order_acquire) ==
  122. writeIndex_.load(std::memory_order_acquire);
  123. }
  124. bool isFull() const {
  125. auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
  126. if (nextRecord == size_) {
  127. nextRecord = 0;
  128. }
  129. if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
  130. return false;
  131. }
  132. // queue is full
  133. return true;
  134. }
  135. // * If called by consumer, then true size may be more (because producer may
  136. // be adding items concurrently).
  137. // * If called by producer, then true size may be less (because consumer may
  138. // be removing items concurrently).
  139. // * It is undefined to call this from any other thread.
  140. size_t sizeGuess() const {
  141. int ret = writeIndex_.load(std::memory_order_acquire) -
  142. readIndex_.load(std::memory_order_acquire);
  143. if (ret < 0) {
  144. ret += size_;
  145. }
  146. return ret;
  147. }
  148. // maximum number of items in the queue.
  149. size_t capacity() const {
  150. return size_ - 1;
  151. }
  152. private:
  153. using AtomicIndex = std::atomic<unsigned int>;
  154. char pad0_[hardware_destructive_interference_size];
  155. const uint32_t size_;
  156. T* const records_;
  157. alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
  158. alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;
  159. char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
  160. };
  161. } // namespace folly