更新具有更大值的数组元素最少需要哪些内存屏障?
Which memory barriers are minimally needed for updating array elements with greater values?
在以下场景中,最低限度需要的内存屏障是什么?
多个线程并行更新数组的元素int a[n]
。
所有元素最初都设置为零。
每个线程为每个元素计算一个新值;然后,
它将计算出的新值与存储在数组中的现有值进行比较,
并且仅当新值大于存储值时才写入新值。
例如,如果一个线程为 a[0]
计算一个新值 5
,但是
a[0]
已经是 10
,那么线程不应该更新 a[0]
。
但是如果线程计算出一个新值 10
,并且 a[0]
是 5
,
那么线程必须更新 a[0]
.
新值的计算涉及一些共享的只读数据;
它根本不涉及数组。
虽然上述线程是运行,但没有其他线程访问该数组。
在保证所有线程完成更新之后,稍后使用该数组。
实现使用比较和交换循环,包装元素
进入 atomic_ref
(来自 Boost 或来自 C++20):
for (int k = 0; k != n; ++k) // For each element of the array
{
// Locally in this thread, compute the new value for a[k].
int new_value = ComputeTheNewValue(k);
// Establish atomic access to a[k].
atomic_ref<int> memory(a[k]);
// [Barrier_0]: Read the existing value.
int existing_value = memory.load(memory_order_relaxed);
while (true) // The compare-and-swap loop.
{
// Overwrite only with higher values.
if (new_value <= existing_value)
break;
// Need to update a[k] with the higher value "new_value", but
// only if a[k] still stores the "existing_value".
if (memory.compare_exchange_weak(existing_value, new_value,
/*Barrier_1*/ memory_order_relaxed,
/*Barrier_2*/ memory_order_relaxed))
{
// a[k] was still storing "existing_value", and it has been
// successfully updated with the higher "new_value".
// We're done, and we may exit the compare-and-swap loop.
break;
}
else
{
// We get here in two cases:
// 1. a[k] was found to store a value different from "existing_value", or
// 2. the compare-and-swap operation has failed spuriously.
// In the first case, the new value stored in a[k] has been loaded
// by compare_exchange_weak() function into the "existing_value" variable.
// Then, we need to compare the "new_value" produced by this thread
// with the newly loaded "existing_value". This is achieved by simply continuing the loop.
// The second case (the spurious failure) is also handled by continuing the loop,
// although in that case the "new_value <= existing_value" comparison is redundant.
continue;
}
}
}
这段代码涉及三个内存屏障:
Barrier_0
在 memory.load()
.
Barrier_1
,当compare_exchange_weak()
成功时用于read-modify-write。
Barrier_2
,当compare_exchange_weak()
失败时用于加载操作。
在这种情况下,代码是否保证只更新更高的值
当所有三个障碍都设置为 relaxed
时?
如果不是,需要什么最小障碍来保证正确的行为?
轻松就好,不需要点单。在更新过程中访问任何其他元素。而对于同一位置的访问,ISO C++保证每个位置分别存在一个“修改顺序”,即使 relaxed
操作也只会看到加载或加载之间位置修改顺序中相同或更晚的值RM星期三
您只是在 CAS 重试循环之外构建原子 fetch_max
原语。由于其他作者都在做同样的事情,所以每个位置的值都是单调递增的。因此,只要您看到大于 new_value
.
的值,就可以完全安全地退出。
为了让主线程在最后收集结果,您确实需要 release/acquire 同步,例如 thread.join
或某种标志。 (例如,可能 fetch_sub(1, release)
有多少线程还有工作要做的计数器,或者 done
标志数组,这样你就可以做一个纯粹的存储。)
顺便说一句,这似乎很慢,等待缓存行在内核之间反弹需要花费大量时间。 (很多false-sharing。)理想情况下,您可以有效地更改它以使每个线程在数组的不同部分上工作(例如,为同一索引计算多个候选对象,因此它不需要任何原子内容)。
I cannot guarantee that the computed indices do not overlap. In practice, the overlapping is usually small, but it cannot be eliminated.
很显然这是一个否定。如果不同线程接触的索引位于不同的缓存行(16 int32_t
的块),那么就不会有太多的错误共享。 (此外,如果计算成本很高,所以您不能非常快地生成值,那很好,所以原子更新不是您的代码花费大部分时间的地方。)
但是如果 存在 显着的争用并且数组不是很大,您可以为每个线程提供自己的输出数组 ,并且最后收集结果。例如让一个线程为每个循环执行 a[i] = max(a[i], b[i], c[i], d[i])
4 到 8 个数组。 (不是太多一次读取流,也不是可变数量的输入,因为这可能无法有效编译)。这应该受益于 SIMD,例如SSE4.1 pmaxsd
执行 4 个并行 max
操作,因此这应该主要受 L3 缓存带宽的限制。
或者将线程之间的最大工作划分为第二个并行阶段,每个线程在输出数组的一部分上执行上述操作。或者让 thread_id % 4 == 0
从自身和接下来的 3 个线程中减少结果,所以如果你有一个有很多线程的系统,你就会有一个减少树。
在以下场景中,最低限度需要的内存屏障是什么?
多个线程并行更新数组的元素int a[n]
。
所有元素最初都设置为零。
每个线程为每个元素计算一个新值;然后,
它将计算出的新值与存储在数组中的现有值进行比较,
并且仅当新值大于存储值时才写入新值。
例如,如果一个线程为 a[0]
计算一个新值 5
,但是
a[0]
已经是 10
,那么线程不应该更新 a[0]
。
但是如果线程计算出一个新值 10
,并且 a[0]
是 5
,
那么线程必须更新 a[0]
.
新值的计算涉及一些共享的只读数据; 它根本不涉及数组。
虽然上述线程是运行,但没有其他线程访问该数组。 在保证所有线程完成更新之后,稍后使用该数组。
实现使用比较和交换循环,包装元素
进入 atomic_ref
(来自 Boost 或来自 C++20):
for (int k = 0; k != n; ++k) // For each element of the array
{
// Locally in this thread, compute the new value for a[k].
int new_value = ComputeTheNewValue(k);
// Establish atomic access to a[k].
atomic_ref<int> memory(a[k]);
// [Barrier_0]: Read the existing value.
int existing_value = memory.load(memory_order_relaxed);
while (true) // The compare-and-swap loop.
{
// Overwrite only with higher values.
if (new_value <= existing_value)
break;
// Need to update a[k] with the higher value "new_value", but
// only if a[k] still stores the "existing_value".
if (memory.compare_exchange_weak(existing_value, new_value,
/*Barrier_1*/ memory_order_relaxed,
/*Barrier_2*/ memory_order_relaxed))
{
// a[k] was still storing "existing_value", and it has been
// successfully updated with the higher "new_value".
// We're done, and we may exit the compare-and-swap loop.
break;
}
else
{
// We get here in two cases:
// 1. a[k] was found to store a value different from "existing_value", or
// 2. the compare-and-swap operation has failed spuriously.
// In the first case, the new value stored in a[k] has been loaded
// by compare_exchange_weak() function into the "existing_value" variable.
// Then, we need to compare the "new_value" produced by this thread
// with the newly loaded "existing_value". This is achieved by simply continuing the loop.
// The second case (the spurious failure) is also handled by continuing the loop,
// although in that case the "new_value <= existing_value" comparison is redundant.
continue;
}
}
}
这段代码涉及三个内存屏障:
Barrier_0
在memory.load()
.Barrier_1
,当compare_exchange_weak()
成功时用于read-modify-write。Barrier_2
,当compare_exchange_weak()
失败时用于加载操作。
在这种情况下,代码是否保证只更新更高的值
当所有三个障碍都设置为 relaxed
时?
如果不是,需要什么最小障碍来保证正确的行为?
轻松就好,不需要点单。在更新过程中访问任何其他元素。而对于同一位置的访问,ISO C++保证每个位置分别存在一个“修改顺序”,即使 relaxed
操作也只会看到加载或加载之间位置修改顺序中相同或更晚的值RM星期三
您只是在 CAS 重试循环之外构建原子 fetch_max
原语。由于其他作者都在做同样的事情,所以每个位置的值都是单调递增的。因此,只要您看到大于 new_value
.
为了让主线程在最后收集结果,您确实需要 release/acquire 同步,例如 thread.join
或某种标志。 (例如,可能 fetch_sub(1, release)
有多少线程还有工作要做的计数器,或者 done
标志数组,这样你就可以做一个纯粹的存储。)
顺便说一句,这似乎很慢,等待缓存行在内核之间反弹需要花费大量时间。 (很多false-sharing。)理想情况下,您可以有效地更改它以使每个线程在数组的不同部分上工作(例如,为同一索引计算多个候选对象,因此它不需要任何原子内容)。
I cannot guarantee that the computed indices do not overlap. In practice, the overlapping is usually small, but it cannot be eliminated.
很显然这是一个否定。如果不同线程接触的索引位于不同的缓存行(16 int32_t
的块),那么就不会有太多的错误共享。 (此外,如果计算成本很高,所以您不能非常快地生成值,那很好,所以原子更新不是您的代码花费大部分时间的地方。)
但是如果 存在 显着的争用并且数组不是很大,您可以为每个线程提供自己的输出数组 ,并且最后收集结果。例如让一个线程为每个循环执行 a[i] = max(a[i], b[i], c[i], d[i])
4 到 8 个数组。 (不是太多一次读取流,也不是可变数量的输入,因为这可能无法有效编译)。这应该受益于 SIMD,例如SSE4.1 pmaxsd
执行 4 个并行 max
操作,因此这应该主要受 L3 缓存带宽的限制。
或者将线程之间的最大工作划分为第二个并行阶段,每个线程在输出数组的一部分上执行上述操作。或者让 thread_id % 4 == 0
从自身和接下来的 3 个线程中减少结果,所以如果你有一个有很多线程的系统,你就会有一个减少树。