From 197d7565603b2e8274f5c176f73b468ce6aa46a6 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 15:17:21 -0400 Subject: bounded_threadsafe_queue: Refactor Pop Introduces PopModes to bring waiting logic into Pop, similar to Push. --- src/common/bounded_threadsafe_queue.h | 202 +++++++++++----------------------- 1 file changed, 62 insertions(+), 140 deletions(-) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 0fb2f42d1..bd87aa09b 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,52 +22,38 @@ class SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); public: - bool TryPush(T&& t) { - return Push(std::move(t)); - } - template bool TryEmplace(Args&&... args) { return Emplace(std::forward(args)...); } - void PushWait(T&& t) { - Push(std::move(t)); - } - template void EmplaceWait(Args&&... args) { Emplace(std::forward(args)...); } bool TryPop(T& t) { - return Pop(t); + return Pop(t); + } + + void PopWait(T& t) { + Pop(t); } void PopWait(T& t, std::stop_token stop_token) { - ConsumerWait(stop_token); - Pop(t); + Pop(t, stop_token); } - T PopWait(std::stop_token stop_token) { - ConsumerWait(stop_token); + T PopWait() { T t; - Pop(t); + Pop(t); return t; } - void Clear() { - while (!Empty()) { - Pop(); - } - } - - bool Empty() const { - return m_read_index.load() == m_write_index.load(); - } - - size_t Size() const { - return m_write_index.load() - m_read_index.load(); + T PopWait(std::stop_token stop_token) { + T t; + Pop(t, stop_token); + return t; } private: @@ -77,55 +63,27 @@ private: Count, }; - template - bool Push(T&& t) { - const size_t write_index = m_write_index.load(); - - if constexpr (Mode == PushMode::Try) { - // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { - return false; - } - } else if constexpr (Mode == PushMode::Wait) { - // Wait until we have free slots to write to. - std::unique_lock lock{producer_cv_mutex}; - producer_cv.wait(lock, [this, write_index] { - return (write_index - m_read_index.load()) < Capacity; - }); - } else { - static_assert(Mode < PushMode::Count, "Invalid PushMode."); - } - - // Determine the position to write to. - const size_t pos = write_index % Capacity; - - // Push into the queue. - m_data[pos] = std::move(t); - - // Increment the write index. - ++m_write_index; - - // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{consumer_cv_mutex}; - consumer_cv.notify_one(); - - return true; - } + enum class PopMode { + Try, + Wait, + WaitWithStopToken, + Count, + }; template bool Emplace(Args&&... args) { - const size_t write_index = m_write_index.load(); + const size_t write_index = m_write_index.load(std::memory_order::relaxed); if constexpr (Mode == PushMode::Try) { // Check if we have free slots to write to. - if ((write_index - m_read_index.load()) == Capacity) { + if ((write_index - m_read_index.load(std::memory_order::acquire)) == Capacity) { return false; } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. std::unique_lock lock{producer_cv_mutex}; producer_cv.wait(lock, [this, write_index] { - return (write_index - m_read_index.load()) < Capacity; + return (write_index - m_read_index.load(std::memory_order::acquire)) < Capacity; }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); @@ -147,34 +105,32 @@ private: return true; } - void Pop() { - const size_t read_index = m_read_index.load(); - - // Check if the queue is empty. - if (read_index == m_write_index.load()) { - return; - } - - // Determine the position to read from. - const size_t pos = read_index % Capacity; - - // Pop the data off the queue, deleting it. - std::destroy_at(std::addressof(m_data[pos])); - - // Increment the read index. - ++m_read_index; - - // Notify the producer that we have popped off the queue. - std::unique_lock lock{producer_cv_mutex}; - producer_cv.notify_one(); - } - - bool Pop(T& t) { - const size_t read_index = m_read_index.load(); + template + bool Pop(T& t, [[maybe_unused]] std::stop_token stop_token = {}) { + const size_t read_index = m_read_index.load(std::memory_order::relaxed); - // Check if the queue is empty. - if (read_index == m_write_index.load()) { - return false; + if constexpr (Mode == PopMode::Try) { + // Check if the queue is empty. + if (read_index == m_write_index.load(std::memory_order::acquire)) { + return false; + } + } else if constexpr (Mode == PopMode::Wait) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + consumer_cv.wait(lock, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + } else if constexpr (Mode == PopMode::WaitWithStopToken) { + // Wait until the queue is not empty. + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this, read_index] { + return read_index != m_write_index.load(std::memory_order::acquire); + }); + if (stop_token.stop_requested()) { + return false; + } + } else { + static_assert(Mode < PopMode::Count, "Invalid PopMode."); } // Determine the position to read from. @@ -193,11 +149,6 @@ private: return true; } - void ConsumerWait(std::stop_token stop_token) { - std::unique_lock lock{consumer_cv_mutex}; - Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); - } - alignas(128) std::atomic_size_t m_read_index{0}; alignas(128) std::atomic_size_t m_write_index{0}; @@ -212,22 +163,12 @@ private: template class MPSCQueue { public: - bool TryPush(T&& t) { - std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::move(t)); - } - template bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; return spsc_queue.TryEmplace(std::forward(args)...); } - void PushWait(T&& t) { - std::scoped_lock lock{write_mutex}; - spsc_queue.PushWait(std::move(t)); - } - template void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; @@ -238,24 +179,20 @@ public: return spsc_queue.TryPop(t); } - void PopWait(T& t, std::stop_token stop_token) { - spsc_queue.PopWait(t, stop_token); + void PopWait(T& t) { + spsc_queue.PopWait(t); } - T PopWait(std::stop_token stop_token) { - return spsc_queue.PopWait(stop_token); - } - - void Clear() { - spsc_queue.Clear(); + void PopWait(T& t, std::stop_token stop_token) { + spsc_queue.PopWait(t, stop_token); } - bool Empty() { - return spsc_queue.Empty(); + T PopWait() { + return spsc_queue.PopWait(); } - size_t Size() { - return spsc_queue.Size(); + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); } private: @@ -266,22 +203,12 @@ private: template class MPMCQueue { public: - bool TryPush(T&& t) { - std::scoped_lock lock{write_mutex}; - return spsc_queue.TryPush(std::move(t)); - } - template bool TryEmplace(Args&&... args) { std::scoped_lock lock{write_mutex}; return spsc_queue.TryEmplace(std::forward(args)...); } - void PushWait(T&& t) { - std::scoped_lock lock{write_mutex}; - spsc_queue.PushWait(std::move(t)); - } - template void EmplaceWait(Args&&... args) { std::scoped_lock lock{write_mutex}; @@ -293,29 +220,24 @@ public: return spsc_queue.TryPop(t); } - void PopWait(T& t, std::stop_token stop_token) { - std::scoped_lock lock{read_mutex}; - spsc_queue.PopWait(t, stop_token); - } - - T PopWait(std::stop_token stop_token) { + void PopWait(T& t) { std::scoped_lock lock{read_mutex}; - return spsc_queue.PopWait(stop_token); + spsc_queue.PopWait(t); } - void Clear() { + void PopWait(T& t, std::stop_token stop_token) { std::scoped_lock lock{read_mutex}; - spsc_queue.Clear(); + spsc_queue.PopWait(t, stop_token); } - bool Empty() { + T PopWait() { std::scoped_lock lock{read_mutex}; - return spsc_queue.Empty(); + return spsc_queue.PopWait(); } - size_t Size() { + T PopWait(std::stop_token stop_token) { std::scoped_lock lock{read_mutex}; - return spsc_queue.Size(); + return spsc_queue.PopWait(stop_token); } private: -- cgit v1.2.3