使用 Atomic Builtins 旋转线程屏障
Spinning thread barrier using Atomic Builtins
我正在尝试使用原子实现旋转线程屏障,特别是 __sync_fetch_and_add。 https://gcc.gnu.org/onlinedocs/gcc-4.4.5/gcc/Atomic-Builtins.html
我基本上想要 pthread barrier 的替代品。我在可以 运行 大约一百个并行线程的系统上使用 Ubuntu。
int bar = 0; //global variable
int P = MAX_THREADS; //number of threads
__sync_fetch_and_add(&bar,1); //each thread comes and adds atomically
while(bar<P){} //threads spin until bar increments to P
bar=0; //a thread sets bar=0 to be used in the next spinning barrier
由于显而易见的原因,这不起作用(一个线程可能设置 bar=0,而另一个线程陷入无限 while 循环等)。我在这里看到了一个实现:Writing a (spinning) thread barrier using c++11 atomics,但是它看起来太复杂了,我认为它的性能可能比 pthread barrier 差。
由于 bar 的高速缓存行在线程之间进行 ping-pong,此实现预计还会在内存层次结构中产生更多流量。
关于如何使用这些原子指令制作简单的屏障有什么想法吗?另外,通信优化方案也会有所帮助。
与其在线程的计数器上自旋,不如在通过的障碍数上自旋更好,这将仅由最后一个线程递增,面临障碍。这样您还可以减少内存缓存压力,因为旋转变量现在仅由单个线程更新。
int P = MAX_THREADS;
int bar = 0; // Counter of threads, faced barrier.
volatile int passed = 0; // Number of barriers, passed by all threads.
void barrier_wait()
{
int passed_old = passed; // Should be evaluated before incrementing *bar*!
if(__sync_fetch_and_add(&bar,1) == (P - 1))
{
// The last thread, faced barrier.
bar = 0;
// *bar* should be reseted strictly before updating of barriers counter.
__sync_synchronize();
passed++; // Mark barrier as passed.
}
else
{
// Not the last thread. Wait others.
while(passed == passed_old) {};
// Need to synchronize cache with other threads, passed barrier.
__sync_synchronize();
}
}
请注意,您需要使用 volatile
修饰符来旋转变量。
C++ 代码可能比 C 代码快一些,因为它可以使用 acquire/release 内存屏障而不是 full 一个,这是 __sync
函数唯一可用的屏障:
int P = MAX_THREADS;
std::atomic<int> bar = 0; // Counter of threads, faced barrier.
std::atomic<int> passed = 0; // Number of barriers, passed by all threads.
void barrier_wait()
{
int passed_old = passed.load(std::memory_order_relaxed);
if(bar.fetch_add(1) == (P - 1))
{
// The last thread, faced barrier.
bar = 0;
// Synchronize and store in one operation.
passed.store(passed_old + 1, std::memory_order_release);
}
else
{
// Not the last thread. Wait others.
while(passed.load(std::memory_order_relaxed) == passed_old) {};
// Need to synchronize cache with other threads, passed barrier.
std::atomic_thread_fence(std::memory_order_acquire);
}
}
我正在尝试使用原子实现旋转线程屏障,特别是 __sync_fetch_and_add。 https://gcc.gnu.org/onlinedocs/gcc-4.4.5/gcc/Atomic-Builtins.html
我基本上想要 pthread barrier 的替代品。我在可以 运行 大约一百个并行线程的系统上使用 Ubuntu。
int bar = 0; //global variable
int P = MAX_THREADS; //number of threads
__sync_fetch_and_add(&bar,1); //each thread comes and adds atomically
while(bar<P){} //threads spin until bar increments to P
bar=0; //a thread sets bar=0 to be used in the next spinning barrier
由于显而易见的原因,这不起作用(一个线程可能设置 bar=0,而另一个线程陷入无限 while 循环等)。我在这里看到了一个实现:Writing a (spinning) thread barrier using c++11 atomics,但是它看起来太复杂了,我认为它的性能可能比 pthread barrier 差。
由于 bar 的高速缓存行在线程之间进行 ping-pong,此实现预计还会在内存层次结构中产生更多流量。
关于如何使用这些原子指令制作简单的屏障有什么想法吗?另外,通信优化方案也会有所帮助。
与其在线程的计数器上自旋,不如在通过的障碍数上自旋更好,这将仅由最后一个线程递增,面临障碍。这样您还可以减少内存缓存压力,因为旋转变量现在仅由单个线程更新。
int P = MAX_THREADS;
int bar = 0; // Counter of threads, faced barrier.
volatile int passed = 0; // Number of barriers, passed by all threads.
void barrier_wait()
{
int passed_old = passed; // Should be evaluated before incrementing *bar*!
if(__sync_fetch_and_add(&bar,1) == (P - 1))
{
// The last thread, faced barrier.
bar = 0;
// *bar* should be reseted strictly before updating of barriers counter.
__sync_synchronize();
passed++; // Mark barrier as passed.
}
else
{
// Not the last thread. Wait others.
while(passed == passed_old) {};
// Need to synchronize cache with other threads, passed barrier.
__sync_synchronize();
}
}
请注意,您需要使用 volatile
修饰符来旋转变量。
C++ 代码可能比 C 代码快一些,因为它可以使用 acquire/release 内存屏障而不是 full 一个,这是 __sync
函数唯一可用的屏障:
int P = MAX_THREADS;
std::atomic<int> bar = 0; // Counter of threads, faced barrier.
std::atomic<int> passed = 0; // Number of barriers, passed by all threads.
void barrier_wait()
{
int passed_old = passed.load(std::memory_order_relaxed);
if(bar.fetch_add(1) == (P - 1))
{
// The last thread, faced barrier.
bar = 0;
// Synchronize and store in one operation.
passed.store(passed_old + 1, std::memory_order_release);
}
else
{
// Not the last thread. Wait others.
while(passed.load(std::memory_order_relaxed) == passed_old) {};
// Need to synchronize cache with other threads, passed barrier.
std::atomic_thread_fence(std::memory_order_acquire);
}
}