更新具有更大值的数组元素最少需要哪些内存屏障?

Which memory barriers are minimally needed for updating array elements with greater values?

在以下场景中,最低限度需要的内存屏障是什么?

多个线程并行更新数组的元素int a[n]。 所有元素最初都设置为零。 每个线程为每个元素计算一个新值;然后, 它将计算出的新值与存储在数组中的现有值进行比较, 并且仅当新值大于存储值时才写入新值。 例如,如果一个线程为 a[0] 计算一个新值 5,但是 a[0] 已经是 10,那么线程不应该更新 a[0]。 但是如果线程计算出一个新值 10,并且 a[0]5, 那么线程必须更新 a[0].

新值的计算涉及一些共享的只读数据; 它根本不涉及数组。

虽然上述线程是运行,但没有其他线程访问该数组。 在保证所有线程完成更新之后,稍后使用该数组。

实现使用比较和交换循环,包装元素 进入 atomic_ref(来自 Boost 或来自 C++20):

for (int k = 0; k != n; ++k) // For each element of the array
{
    // Locally in this thread, compute the new value for a[k].
    int new_value = ComputeTheNewValue(k);

    // Establish atomic access to a[k].
    atomic_ref<int> memory(a[k]);

    // [Barrier_0]: Read the existing value.
    int existing_value = memory.load(memory_order_relaxed);

    while (true) // The compare-and-swap loop.
    {
        // Overwrite only with higher values.
        if (new_value <= existing_value)
            break;

        // Need to update a[k] with the higher value "new_value", but
        // only if a[k] still stores the "existing_value".
        if (memory.compare_exchange_weak(existing_value, new_value,
                   /*Barrier_1*/         memory_order_relaxed,
                   /*Barrier_2*/         memory_order_relaxed))
        {
            // a[k] was still storing "existing_value", and it has been
            // successfully updated with the higher "new_value".
            // We're done, and we may exit the compare-and-swap loop.
            break;
        }
        else
        {
            // We get here in two cases:
            //  1. a[k] was found to store a value different from "existing_value", or
            //  2. the compare-and-swap operation has failed spuriously.
            // In the first case, the new value stored in a[k] has been loaded
            // by compare_exchange_weak() function into the "existing_value" variable.
            // Then, we need to compare the "new_value" produced by this thread
            // with the newly loaded "existing_value". This is achieved by simply continuing the loop.
            // The second case (the spurious failure) is also handled by continuing the loop,
            // although in that case the "new_value <= existing_value" comparison is redundant.
            continue;
        }
    }

}

这段代码涉及三个内存屏障:

  1. Barrier_0memory.load().
  2. Barrier_1,当compare_exchange_weak()成功时用于read-modify-write。
  3. Barrier_2,当compare_exchange_weak()失败时用于加载操作。

在这种情况下,代码是否保证只更新更高的值 当所有三个障碍都设置为 relaxed 时? 如果不是,需要什么最小障碍来保证正确的行为?

轻松就好,不需要点单。在更新过程中访问任何其他元素。而对于同一位置的访问,ISO C++保证每个位置分别存在一个“修改顺序”,即使 relaxed 操作也只会看到加载或加载之间位置修改顺序中相同或更晚的值RM星期三

您只是在 CAS 重试循环之外构建原子 fetch_max 原语。由于其他作者都在做同样的事情,所以每个位置的值都是单调递增的。因此,只要您看到大于 new_value.

的值,就可以完全安全地退出。

为了让主线程在最后收集结果,您确实需要 release/acquire 同步,例如 thread.join 或某种标志。 (例如,可能 fetch_sub(1, release) 有多少线程还有工作要做的计数器,或者 done 标志数组,这样你就可以做一个纯粹的存储。)


顺便说一句,这似乎很慢,等待缓存行在内核之间反弹需要花费大量时间。 (很多false-sharing。)理想情况下,您可以有效地更改它以使每个线程在数组的不同部分上工作(例如,为同一索引计算多个候选对象,因此它不需要任何原子内容)。

I cannot guarantee that the computed indices do not overlap. In practice, the overlapping is usually small, but it cannot be eliminated.

很显然这是一个否定。如果不同线程接触的索引位于不同的缓存行(16 int32_t 的块),那么就不会有太多的错误共享。 (此外,如果计算成本很高,所以您不能非常快地生成值,那很好,所以原子更新不是您的代码花费大部分时间的地方。)

但是如果 存在 显着的争用并且数组不是很大,您可以为每个线程提供自己的输出数组 ,并且最后收集结果。例如让一个线程为每个循环执行 a[i] = max(a[i], b[i], c[i], d[i]) 4 到 8 个数组。 (不是太多一次读取流,也不是可变数量的输入,因为这可能无法有效编译)。这应该受益于 SIMD,例如SSE4.1 pmaxsd 执行 4 个并行 max 操作,因此这应该主要受 L3 缓存带宽的限制。

或者将线程之间的最大工作划分为第二个并行阶段,每个线程在输出数组的一部分上执行上述操作。或者让 thread_id % 4 == 0 从自身和接下来的 3 个线程中减少结果,所以如果你有一个有很多线程的系统,你就会有一个减少树。