快速且无锁的单个写入器,多个 Reader

Fast and Lock Free Single Writer, Multiple Reader

我有一个作家必须以相当高的频率递增一个变量,还有一个或多个 reader 以较低的频率访问这个变量。

写入由外部中断触发。

因为我需要高速写入,所以我不想使用互斥锁或其他昂贵的锁定机制。

我想出的方法是在写入后复制值。 reader 现在可以比较原件和副本。如果它们相等,则变量的内容有效。

这是我在 C++ 中的实现

template<typename T>
class SafeValue
{
private:
    volatile T _value;
    volatile T _valueCheck;
public:
    void setValue(T newValue)
    {
        _value = newValue;
        _valueCheck = _value;
    }

    T getValue()
    {
        volatile T value;
        volatile T valueCheck;
        do
        {
            valueCheck = _valueCheck;
            value = _value;
        } while(value != valueCheck);

        return value;
    }
}

这背后的想法是在读取时检测数据竞争,如果发生则重试。但是,我不知道这是否会一直有效。我还没有在网上找到关于这个方法的任何信息,因此我的问题是:

当我的方法与单个作者和多个 reader 一起使用时有什么问题吗?

我已经知道写入频率过高可能会导致reader饿死。是否有更多不良影响需要我注意?难道这根本不是线程安全的吗?

编辑 1:

我的目标系统是 ARM Cortex-A15。

T 应该至少可以变成任何原始整数类型。

编辑 2:

std::atomic 在 reader 和作家网站上太慢了。我在我的系统上对它进行了基准测试。与未受保护的原始操作相比,写入速度大约慢 30 倍,读取速度大约慢 50 倍。

这个单一变量只是一个整数、指针还是普通的旧值类型,您可以只使用 std::atomic.

另一种选择是使用发布者生成的非原子值缓冲区和指向最新值的原子指针。

#include <atomic>
#include <utility>

template<class T>
class PublisherValue {
    static auto constexpr N = 32;
    T values_[N];
    std::atomic<T*> current_{values_};

public:
    PublisherValue() = default;
    PublisherValue(PublisherValue const&) = delete;
    PublisherValue& operator=(PublisherValue const&) = delete;

    // Single writer thread only.
    template<class U>
    void store(U&& value) {
        T* p = current_.load(std::memory_order_relaxed);
        if(++p == values_ + N)
            p = values_;
        *p = std::forward<U>(value);
        current_.store(p, std::memory_order_release); // (1) 
    }

    // Multiple readers. Make a copy to avoid referring the value for too long.
    T load() const {
        return *current_.load(std::memory_order_consume); // Sync with (1).
    }
};

This is wait-free,但在复制值时 reader 可能会被取消调度的可能性很小,因此在它已被部分覆盖时读取最旧的值。使 N 变大可以降低这种风险。

不可能有人知道。您将不得不查看您的编译器是否记录了任何多线程语义来保证这将起作用,或者查看生成的汇编代码并说服自己它会起作用。请注意,在后一种情况下,更高版本的编译器、不同的优化选项或更新的 CPU 总是有可能破坏代码。

我建议使用适当的 memory_order 测试 std::atomic。如果由于某种原因太慢,请使用内联汇编。

您应该首先尝试使用 std::atomic,但要确保您的编译器了解并理解您的目标体系结构。由于您的目标是 Cortex-A15 (ARMv7-A cpu),请确保使用 -march=armv7-a 甚至 -mcpu=cortex-a15.

第一个应生成 ldrexd 指令,根据 ARM 文档,该指令应该是原子的:

Single-copy atomicity

In ARMv7, the single-copy atomic processor accesses are:

  • all byte accesses
  • all halfword accesses to halfword-aligned locations
  • all word accesses to word-aligned locations
  • memory accesses caused by LDREXD and STREXD instructions to doubleword-aligned locations.

后者应生成 ldrd 指令,该指令在支持大型物理地址扩展的目标上应该是原子的:

In an implementation that includes the Large Physical Address Extension, LDRD and STRD accesses to 64-bit aligned locations are 64-bit single-copy atomic as seen by translation table walks and accesses to translation tables.

--- Note ---

The Large Physical Address Extension adds this requirement to avoid the need to complex measures to avoid atomicity issues when changing translation table entries, without creating a requirement that all locations in the memory system are 64-bit single-copy atomic.

你也可以查看Linux内核如何implements那些:

#ifdef CONFIG_ARM_LPAE
static inline long long atomic64_read(const atomic64_t *v)
{
    long long result;

    __asm__ __volatile__("@ atomic64_read\n"
"   ldrd    %0, %H0, [%1]"
    : "=&r" (result)
    : "r" (&v->counter), "Qo" (v->counter)
    );

    return result;
}
#else
static inline long long atomic64_read(const atomic64_t *v)
{
    long long result;

    __asm__ __volatile__("@ atomic64_read\n"
"   ldrexd  %0, %H0, [%1]"
    : "=&r" (result)
    : "r" (&v->counter), "Qo" (v->counter)
    );

    return result;
}
#endif