semi-lock-free spsc queue with release-acquire memory ordering

  atomic, c++, concurrency, memory-barriers

Following is a simplified implementation of a semi-lock-free spsc queue.I intentionally simplified it to be easier to read and easier to reproduce the deadlock problem.

The problem is in the memory-ordering of write_pos_ and read_pos_ with respect to enqueuer_in_sleep_ and dequeuer_in_sleep_ which can lead to deadlock in certain circumstances that I commented in the code.

Questions:

  1. What is the solution that can solve the problem and keep memory ordering of operation on write_pos_ and read_pos_ release-acquire? using semaphore, condition variable, and c++20 atomic notify-wait functions have same performance degradation as using seq_cst memory ordering.
  2. Is there any viable way to load write_pos_ and read_pos_ and make sure they contained last stored value with release memory order? (something like adding some sort of delay)
  3. Because this problem occurs in edge cases the delay solution which I mentioned in code comments can preserve the release-acquire ordering and the performance of the queue stays comparable to lock-free queues (nearly 400M enqueue/dequeue per second). using any sort of light-weight semaphore drops the performance to 60M enqueue/dequeue per second. Is this performance degradation because semaphores use xchg on each release() acquire() call which leads to using processor’s locking protocol?
#include <array>
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>

static void wait_on_address(std::atomic<uint32_t>* ptr, uint32_t val)
{
    do
    {
        syscall(SYS_futex, reinterpret_cast<uint32_t*>(ptr), FUTEX_WAIT_PRIVATE, val, NULL, 0, 0);
    } while (ptr->load(std::memory_order_acquire) == val); /* check for spurious wakeup */
}

static void wake_by_address(std::atomic<uint32_t>* ptr)
{
    syscall(SYS_futex, reinterpret_cast<uint32_t*>(ptr), FUTEX_WAKE_PRIVATE, 1, 0, 0, 0);
}

template<typename T, uint32_t S>
class spsc
{
    alignas(64) std::array<T, S> buffer_{};
    alignas(64) std::atomic<uint32_t> write_pos_{};
    alignas(64) std::atomic<uint32_t> read_pos_{};
    alignas(64) std::atomic<bool> enqueuer_in_sleep_{};
    alignas(64) std::atomic<bool> dequeuer_in_sleep_{};

  public:
    void enqueue(const T& value)
    {
        const auto write_pos = write_pos_.load(std::memory_order_relaxed);

        auto next_write_pos = 1 + write_pos;
        if (next_write_pos == buffer_.size())
            next_write_pos = 0;

        uint8_t retries = 0;
        while (next_write_pos == read_pos_.load(std::memory_order_acquire))
        {
            if (++retries > 16)
            {
                /*
                    suppose that, before we change enqueuer_in_sleep_ in the next line, the other
                    thread entered dequeue() and updated read_pos_. when we change enqueuer_in_sleep_
                    to true and reach to wait_on_address(), read_pos_ is already changed and
                    we should return immediately from the kernel, but that's not always the case
                    because other thread updated read_pos_ with release memory_order it is possible
                    we read its previous value and wait_on_address() would go to sleep while
                    there is/are item(s) left in the queue.

                    if we change memory_order of store on read_pos_ from release to seq_cst
                    the problem would be solved but overall profromance drops multiple times.
                */
                enqueuer_in_sleep_.store(true, std::memory_order_seq_cst);
                /*
                    if we add some delay here:
                    std::this_thread::sleep_for(std::chrono::microseconds{1});

                    when we reach to wait_on_address() we can make sure (to some degree) we are reading
                    last store on read_pos_ (last store relative to changing enqueuer_in_sleep_)
                    solving the problem in this way preserves release-acquire ordering but it doesn't
                    seem to be a viable solution.
                */
                wait_on_address(&read_pos_, next_write_pos);
                enqueuer_in_sleep_.store(false, std::memory_order_seq_cst);
                break;
            }
        }

        buffer_[write_pos] = value;

        write_pos_.store(next_write_pos, std::memory_order_release);

        if (dequeuer_in_sleep_.load(std::memory_order_seq_cst))
        {
            wake_by_address(&write_pos_);
        }
    }

    void dequeue(T& value)
    {
        auto const read_pos = read_pos_.load(std::memory_order_relaxed);

        uint8_t retries = 0;
        while (read_pos == write_pos_.load(std::memory_order_acquire))
        {
            if (++retries > 16)
            {
                dequeuer_in_sleep_.store(true, std::memory_order_seq_cst);
                /*
                    same problem can happen here.
                */
                wait_on_address(&write_pos_, read_pos);
                dequeuer_in_sleep_.store(false, std::memory_order_seq_cst);
                break;
            }
        }

        value = buffer_[read_pos];

        auto next_read_pos = read_pos + 1;
        if (next_read_pos == buffer_.size())
            next_read_pos = 0;

        read_pos_.store(next_read_pos, std::memory_order_release);

        if (enqueuer_in_sleep_.load(std::memory_order_seq_cst))
        {
            wake_by_address(&read_pos_);
        }
    }
};

int main()
{
    auto attempts = 0;
    for (;;)
    {
        constexpr uint64_t iterations = 100'000;
        spsc<uint64_t, 4> spsc;

        std::thread t([&] {
            for (uint64_t i = 0; i < iterations; i++)
            {
                uint64_t v;
                spsc.dequeue(v);
            }
        });

        for (uint64_t i = 0; i < iterations; i++)
        {
            spsc.enqueue(i);
        }

        t.join();

        std::cout << "attempts to deadlock: " << ++attempts << std::endl;
    };
}

Source: Windows Questions C++

LEAVE A COMMENT