123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- /*
- * Copyright 2012-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- // @author Bo Hu (bhu@fb.com)
- // @author Jordan DeLong (delong.j@fb.com)
- #pragma once
- #include <atomic>
- #include <cassert>
- #include <cstdlib>
- #include <memory>
- #include <stdexcept>
- #include <type_traits>
- #include <utility>
- #include <folly/concurrency/CacheLocality.h>
- namespace folly {
- /*
- * ProducerConsumerQueue is a one producer and one consumer queue
- * without locks.
- */
- template <class T>
- struct ProducerConsumerQueue {
- typedef T value_type;
- ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
- ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
- // size must be >= 2.
- //
- // Also, note that the number of usable slots in the queue at any
- // given time is actually (size-1), so if you start with an empty queue,
- // isFull() will return true after size-1 insertions.
- explicit ProducerConsumerQueue(uint32_t size)
- : size_(size),
- records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
- readIndex_(0),
- writeIndex_(0) {
- assert(size >= 2);
- if (!records_) {
- throw std::bad_alloc();
- }
- }
- ~ProducerConsumerQueue() {
- // We need to destruct anything that may still exist in our queue.
- // (No real synchronization needed at destructor time: only one
- // thread can be doing this.)
- if (!std::is_trivially_destructible<T>::value) {
- size_t readIndex = readIndex_;
- size_t endIndex = writeIndex_;
- while (readIndex != endIndex) {
- records_[readIndex].~T();
- if (++readIndex == size_) {
- readIndex = 0;
- }
- }
- }
- std::free(records_);
- }
- template <class... Args>
- bool write(Args&&... recordArgs) {
- auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
- auto nextRecord = currentWrite + 1;
- if (nextRecord == size_) {
- nextRecord = 0;
- }
- if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
- new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
- writeIndex_.store(nextRecord, std::memory_order_release);
- return true;
- }
- // queue is full
- return false;
- }
- // move (or copy) the value at the front of the queue to given variable
- bool read(T& record) {
- auto const currentRead = readIndex_.load(std::memory_order_relaxed);
- if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
- // queue is empty
- return false;
- }
- auto nextRecord = currentRead + 1;
- if (nextRecord == size_) {
- nextRecord = 0;
- }
- record = std::move(records_[currentRead]);
- records_[currentRead].~T();
- readIndex_.store(nextRecord, std::memory_order_release);
- return true;
- }
- // pointer to the value at the front of the queue (for use in-place) or
- // nullptr if empty.
- T* frontPtr() {
- auto const currentRead = readIndex_.load(std::memory_order_relaxed);
- if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
- // queue is empty
- return nullptr;
- }
- return &records_[currentRead];
- }
- // queue must not be empty
- void popFront() {
- auto const currentRead = readIndex_.load(std::memory_order_relaxed);
- assert(currentRead != writeIndex_.load(std::memory_order_acquire));
- auto nextRecord = currentRead + 1;
- if (nextRecord == size_) {
- nextRecord = 0;
- }
- records_[currentRead].~T();
- readIndex_.store(nextRecord, std::memory_order_release);
- }
- bool isEmpty() const {
- return readIndex_.load(std::memory_order_acquire) ==
- writeIndex_.load(std::memory_order_acquire);
- }
- bool isFull() const {
- auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
- if (nextRecord == size_) {
- nextRecord = 0;
- }
- if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
- return false;
- }
- // queue is full
- return true;
- }
- // * If called by consumer, then true size may be more (because producer may
- // be adding items concurrently).
- // * If called by producer, then true size may be less (because consumer may
- // be removing items concurrently).
- // * It is undefined to call this from any other thread.
- size_t sizeGuess() const {
- int ret = writeIndex_.load(std::memory_order_acquire) -
- readIndex_.load(std::memory_order_acquire);
- if (ret < 0) {
- ret += size_;
- }
- return ret;
- }
- // maximum number of items in the queue.
- size_t capacity() const {
- return size_ - 1;
- }
- private:
- using AtomicIndex = std::atomic<unsigned int>;
- char pad0_[hardware_destructive_interference_size];
- const uint32_t size_;
- T* const records_;
- alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
- alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;
- char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
- };
- } // namespace folly
|