C 和低级信号量实现
C & low-level semaphore implementation
我正在考虑如何使用尽可能少的 asm 代码来实现信号量(不是二进制)。
我没有成功地思考和编写它而不使用互斥体,所以这是我到目前为止能做的最好的:
全球:
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
typedef struct
{
atomic_ullong value;
pthread_mutex_t *lock_op;
bool ready;
} semaphore_t;
typedef struct
{
atomic_ullong value;
pthread_mutex_t lock_op;
bool ready;
} static_semaphore_t;
/* use with static_semaphore_t */
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true}
函数:
bool semaphore_init(semaphore_t *semaphore, unsigned long long init_value)
{
if(semaphore->ready) if(!(semaphore->lock_op = \
calloc(1, sizeof(pthread_mutex_t)))) return false;
else pthread_mutex_destroy(semaphore->lock_op);
if(pthread_mutex_init(semaphore->lock_op, NULL))
return false;
semaphore->value = init_value;
semaphore->ready = true;
return true;
}
bool semaphore_wait(semaphore_t *semaphore)
{
if(!semaphore->ready) return false;
pthread_mutex_lock(&(semaphore->lock_op));
while(!semaphore->value) __asm__ __volatile__("nop");
(semaphore->value)--;
pthread_mutex_unlock(&(semaphore->lock_op));
return true;
}
bool semaphore_post(semaphore_t *semaphore)
{
if(!semaphore->ready) return false;
atomic_fetch_add(&(semaphore->value), (unsigned long long) 1);
return true;
}
是否可以仅使用几行代码、原子内置函数或直接在汇编中(例如 lock cmpxchg
)来实现信号量?
查看 <semaphore.h>
包含的 <bits/sempahore.h>
中的 sem_t 结构,在我看来,它选择了一条非常不同的路径...
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
更新:
@PeterCordes 提出了一个绝对更好的解决方案,使用原子,没有互斥量,直接对信号量值进行检查。
我仍然想更好地了解利用内置暂停函数或内核调用的性能改进代码的机会,这些函数可以避免 CPU 浪费,等待关键资源可用。
如果能有一个标准的互斥锁和非二进制信号量实现来进行比较,那就太好了。
来自 futex(7) 我读到:"The Linux kernel provides futexes ("快速用户-space 互斥") 作为快速用户-space 锁定和信号量。Futexes 非常基础,非常适合构建更高级别的锁定抽象,例如互斥锁、条件变量、读写锁、屏障和信号量。"
请参阅我的可能有效的最小朴素信号量实现的部分内容。它编译并看起来适合 x86。我认为对于任何 C11 实现来说,它通常都是正确的。
IIRC,可以使用原子操作访问 implement a counting lock (aka semaphore) with just a single integer。维基百科 link 甚至给出了 up
/down
的算法。您不需要单独的互斥体。如果 atomic_ullong
需要一个互斥锁来支持目标 CPU 上的原子 increment/decrement,它将包含一个。 (这可能是 32 位 x86 上的情况,或者实现使用慢速 cmpxchg8
而不是快速 lock xadd
。32 位计数器对你的信号量来说真的太小了吗?因为 64 位原子在 32 位上会更慢机器。)
<bits/sempahore.h>
联合定义显然只是一个具有正确大小的不透明 POD 类型,并不表示实际实现。
正如@David Schwartz 所说,除非您是专家,否则实现自己的锁定以供实际使用是徒劳的。不过,这可能是一种了解原子操作并找出标准实现中隐藏内容的有趣方式。请仔细注意他的警告,即锁定实现很难测试。您可以使用当前版本的编译器和您选择的编译选项编写适用于硬件测试用例的代码...
ready
布尔值完全是 space 的浪费。如果您可以正确初始化 ready
标志,以便函数查看它是有意义的,那么您可以将其他字段初始化为正常的初始状态。
我注意到您的代码还有其他一些问题:
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true};
static_semaphore_t my_lock = SEMAPHORE_INITIALIZER(1);
// expands to my_lock = = {1, PTHREAD_MUTEX_INITIALIZER, true};;
// you should leave out the = and ; in the macro def so it works like a value
使用动态分配的 pthread_mutex_t *lock_op
很愚蠢。使用值,而不是指针。大多数锁定函数都使用互斥锁,因此额外的间接级别只会减慢速度。如果内存与计数器一起存在会更好。互斥锁不需要很多 space.
while(!semaphore->value) __asm__ __volatile__("nop");
我们希望这个循环避免浪费能量和减慢其他线程,甚至其他逻辑线程与超线程共享同一个核心。
nop
不会减少忙等待循环的 CPU 强度。即使使用超线程,它在 x86 上也可能没有什么区别,因为整个循环体仍然可能适合 4 微指令,因此每个时钟迭代一次问题是否存在 nop
。 nop
不需要执行单元,所以至少它不会造成伤害。这个自旋循环发生在持有互斥锁的情况下,这看起来很愚蠢。所以第一个服务员将进入这个自旋循环,而之后的服务员将在互斥锁上自旋。
这是我对信号量的简单实现,仅使用 C11 原子操作
我认为这是一个很好的实现,它实现了正确和小型(源代码和机器代码)的非常有限的目标,并且不使用其他实际的锁定原语。有些主要领域我什至没有尝试解决(例如 fairness/starvation,将 CPU 交给其他线程,可能是其他东西)。
查看 asm output on godbolt:down
只有 12 个 x86 insn,up
有 2 个(包括 ret
s)。 Godbolt 的非 x86 编译器(ARM/ARM64/PPC 的 gcc 4.8)太旧,无法支持 C11 <stdatomic.h>
。 (不过他们确实有 C++ std::atomic
)。所以很遗憾,我无法轻松检查非 x86 上的 asm 输出。
#include <stdatomic.h>
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
typedef struct {
atomic_int val; // int is plenty big. Must be signed, so an extra decrement doesn't make 0 wrap to >= 1
} naive_sem_t;
#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
static inline void spinloop_body(void) { _mm_pause(); } // PAUSE is "rep nop" in asm output
#else
static inline void spinloop_body(void) { }
#endif
void sem_down(naive_sem_t *sem)
{
while (1) {
while (likely(atomic_load_explicit(&(sem->val), memory_order_acquire ) < 1))
spinloop_body(); // wait for a the semaphore to be available
int tmp = atomic_fetch_add_explicit( &(sem->val), -1, memory_order_acq_rel ); // try to take the lock. Might only need mo_acquire
if (likely(tmp >= 1))
break; // we successfully got the lock
else // undo our attempt; another thread's decrement happened first
atomic_fetch_add_explicit( &(sem->val), 1, memory_order_release ); // could be "relaxed", but we're still busy-waiting and want other thread to see this ASAP
}
}
// note the release, not seq_cst. Use a stronger ordering parameter if you want it to be a full barrier.
void sem_up(naive_sem_t *sem) {
atomic_fetch_add_explicit(&(sem->val), 1, memory_order_release);
}
这里的技巧是val
暂时太低也可以;这只会让其他线程旋转。还要注意 fetch_add
作为单个原子操作是关键 。它 returns 是旧值,因此我们可以检测到 val
何时在 while 循环的加载和 fetch_add 之间被另一个线程占用。 (请注意,我们不需要检查 tmp
是否 == 到 while 循环的负载:如果另一个线程 up
ed 负载和 fetch_add 之间的信号量就没问题了。这使用 fetch_add 而不是 cmpxchg 是一个好处。
atomic_load
自旋循环只是一种性能优化,优于让所有服务员在 val
上执行原子读取-修改-写入。 (尽管有许多服务员试图使用 inc 来 dec 然后撤消,但让服务员看到锁被解锁的情况可能非常罕见)。
真正的实现应该有针对更多平台的特殊内容,而不仅仅是 x86。对于 x86,可能不仅仅是旋转循环内的 PAUSE
指令。这仍然只是一个完全可移植的 C11 实现的玩具示例。 PAUSE
显然有助于避免对内存排序的错误推测,因此 CPU 运行 在离开自旋循环后更有效。 pause
与将逻辑 CPU 生成到 OS 不同的线程与 运行 不同。它也与 memory_order_???
参数的正确性和选择无关。
一个真正的实现可能会在一些旋转迭代后放弃 CPU 到 OS(sched_yield(2)
,或者更可能是 futex
系统调用, 见下文)。也许使用 x86 MONITOR
/ MWAIT
对超线程更加友好;我不确定。我从来没有真正实现过锁定自己,我只是在查找其他 insn 时在 x86 insn 参考中看到所有这些东西。
如前所述,x86 的 lock xadd
指令实现了 fetch_add
(具有顺序一致性语义,因为 lock
ed 指令始终是完整的内存屏障)。在非 x86 上,仅对 fetch_add 使用 acquire+release 语义,而不是完全顺序一致性可能允许更高效的代码。我不确定,但仅使用 acquire
很可能会在 ARM64 上实现更高效的代码。我想我们只需要 acquire
on the fetch_add, not acq_rel, but I'm not sure. On x86 there won't be any difference in code, since lock
ed instructions are the only way to do atomic read-modify-write, so even relaxed
will be the same as seq_cst
(except for compile-time reordering.)
如果你想产生 CPU 而不是旋转,你需要一个系统调用(正如人们所说)。显然,为了使标准库锁定在 Linux 上尽可能高效,已经进行了大量工作。有专门的系统调用可以帮助内核在释放锁时唤醒正确的线程,它们不易于使用。 From futex(7)
:
NOTES
To reiterate, bare futexes are not intended as an easy-to-use abstraction for end-users. (There is no wrapper function for this
system call
in glibc.) Implementors are expected to be assembly literate and to have read the sources of the futex user-space library
referenced below.
公平/饥饿(我天真的实现忽略了)
正如维基百科文章所提到的,某种唤醒队列是个好主意,这样同一个线程就不会每次都获取信号量。 (释放后快速获取锁的代码通常会让释放线程在其他线程仍在睡眠时获得锁)。
这是进程中内核协作的另一大好处(futex
)。
我正在考虑如何使用尽可能少的 asm 代码来实现信号量(不是二进制)。
我没有成功地思考和编写它而不使用互斥体,所以这是我到目前为止能做的最好的:
全球:
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
typedef struct
{
atomic_ullong value;
pthread_mutex_t *lock_op;
bool ready;
} semaphore_t;
typedef struct
{
atomic_ullong value;
pthread_mutex_t lock_op;
bool ready;
} static_semaphore_t;
/* use with static_semaphore_t */
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true}
函数:
bool semaphore_init(semaphore_t *semaphore, unsigned long long init_value)
{
if(semaphore->ready) if(!(semaphore->lock_op = \
calloc(1, sizeof(pthread_mutex_t)))) return false;
else pthread_mutex_destroy(semaphore->lock_op);
if(pthread_mutex_init(semaphore->lock_op, NULL))
return false;
semaphore->value = init_value;
semaphore->ready = true;
return true;
}
bool semaphore_wait(semaphore_t *semaphore)
{
if(!semaphore->ready) return false;
pthread_mutex_lock(&(semaphore->lock_op));
while(!semaphore->value) __asm__ __volatile__("nop");
(semaphore->value)--;
pthread_mutex_unlock(&(semaphore->lock_op));
return true;
}
bool semaphore_post(semaphore_t *semaphore)
{
if(!semaphore->ready) return false;
atomic_fetch_add(&(semaphore->value), (unsigned long long) 1);
return true;
}
是否可以仅使用几行代码、原子内置函数或直接在汇编中(例如 lock cmpxchg
)来实现信号量?
查看 <semaphore.h>
包含的 <bits/sempahore.h>
中的 sem_t 结构,在我看来,它选择了一条非常不同的路径...
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
更新:
@PeterCordes 提出了一个绝对更好的解决方案,使用原子,没有互斥量,直接对信号量值进行检查。
我仍然想更好地了解利用内置暂停函数或内核调用的性能改进代码的机会,这些函数可以避免 CPU 浪费,等待关键资源可用。
如果能有一个标准的互斥锁和非二进制信号量实现来进行比较,那就太好了。
来自 futex(7) 我读到:"The Linux kernel provides futexes ("快速用户-space 互斥") 作为快速用户-space 锁定和信号量。Futexes 非常基础,非常适合构建更高级别的锁定抽象,例如互斥锁、条件变量、读写锁、屏障和信号量。"
请参阅我的可能有效的最小朴素信号量实现的部分内容。它编译并看起来适合 x86。我认为对于任何 C11 实现来说,它通常都是正确的。
IIRC,可以使用原子操作访问 implement a counting lock (aka semaphore) with just a single integer。维基百科 link 甚至给出了 up
/down
的算法。您不需要单独的互斥体。如果 atomic_ullong
需要一个互斥锁来支持目标 CPU 上的原子 increment/decrement,它将包含一个。 (这可能是 32 位 x86 上的情况,或者实现使用慢速 cmpxchg8
而不是快速 lock xadd
。32 位计数器对你的信号量来说真的太小了吗?因为 64 位原子在 32 位上会更慢机器。)
<bits/sempahore.h>
联合定义显然只是一个具有正确大小的不透明 POD 类型,并不表示实际实现。
正如@David Schwartz 所说,除非您是专家,否则实现自己的锁定以供实际使用是徒劳的。不过,这可能是一种了解原子操作并找出标准实现中隐藏内容的有趣方式。请仔细注意他的警告,即锁定实现很难测试。您可以使用当前版本的编译器和您选择的编译选项编写适用于硬件测试用例的代码...
ready
布尔值完全是 space 的浪费。如果您可以正确初始化 ready
标志,以便函数查看它是有意义的,那么您可以将其他字段初始化为正常的初始状态。
我注意到您的代码还有其他一些问题:
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true};
static_semaphore_t my_lock = SEMAPHORE_INITIALIZER(1);
// expands to my_lock = = {1, PTHREAD_MUTEX_INITIALIZER, true};;
// you should leave out the = and ; in the macro def so it works like a value
使用动态分配的 pthread_mutex_t *lock_op
很愚蠢。使用值,而不是指针。大多数锁定函数都使用互斥锁,因此额外的间接级别只会减慢速度。如果内存与计数器一起存在会更好。互斥锁不需要很多 space.
while(!semaphore->value) __asm__ __volatile__("nop");
我们希望这个循环避免浪费能量和减慢其他线程,甚至其他逻辑线程与超线程共享同一个核心。
nop
不会减少忙等待循环的 CPU 强度。即使使用超线程,它在 x86 上也可能没有什么区别,因为整个循环体仍然可能适合 4 微指令,因此每个时钟迭代一次问题是否存在 nop
。 nop
不需要执行单元,所以至少它不会造成伤害。这个自旋循环发生在持有互斥锁的情况下,这看起来很愚蠢。所以第一个服务员将进入这个自旋循环,而之后的服务员将在互斥锁上自旋。
这是我对信号量的简单实现,仅使用 C11 原子操作
我认为这是一个很好的实现,它实现了正确和小型(源代码和机器代码)的非常有限的目标,并且不使用其他实际的锁定原语。有些主要领域我什至没有尝试解决(例如 fairness/starvation,将 CPU 交给其他线程,可能是其他东西)。
查看 asm output on godbolt:down
只有 12 个 x86 insn,up
有 2 个(包括 ret
s)。 Godbolt 的非 x86 编译器(ARM/ARM64/PPC 的 gcc 4.8)太旧,无法支持 C11 <stdatomic.h>
。 (不过他们确实有 C++ std::atomic
)。所以很遗憾,我无法轻松检查非 x86 上的 asm 输出。
#include <stdatomic.h>
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
typedef struct {
atomic_int val; // int is plenty big. Must be signed, so an extra decrement doesn't make 0 wrap to >= 1
} naive_sem_t;
#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
static inline void spinloop_body(void) { _mm_pause(); } // PAUSE is "rep nop" in asm output
#else
static inline void spinloop_body(void) { }
#endif
void sem_down(naive_sem_t *sem)
{
while (1) {
while (likely(atomic_load_explicit(&(sem->val), memory_order_acquire ) < 1))
spinloop_body(); // wait for a the semaphore to be available
int tmp = atomic_fetch_add_explicit( &(sem->val), -1, memory_order_acq_rel ); // try to take the lock. Might only need mo_acquire
if (likely(tmp >= 1))
break; // we successfully got the lock
else // undo our attempt; another thread's decrement happened first
atomic_fetch_add_explicit( &(sem->val), 1, memory_order_release ); // could be "relaxed", but we're still busy-waiting and want other thread to see this ASAP
}
}
// note the release, not seq_cst. Use a stronger ordering parameter if you want it to be a full barrier.
void sem_up(naive_sem_t *sem) {
atomic_fetch_add_explicit(&(sem->val), 1, memory_order_release);
}
这里的技巧是val
暂时太低也可以;这只会让其他线程旋转。还要注意 fetch_add
作为单个原子操作是关键 。它 returns 是旧值,因此我们可以检测到 val
何时在 while 循环的加载和 fetch_add 之间被另一个线程占用。 (请注意,我们不需要检查 tmp
是否 == 到 while 循环的负载:如果另一个线程 up
ed 负载和 fetch_add 之间的信号量就没问题了。这使用 fetch_add 而不是 cmpxchg 是一个好处。
atomic_load
自旋循环只是一种性能优化,优于让所有服务员在 val
上执行原子读取-修改-写入。 (尽管有许多服务员试图使用 inc 来 dec 然后撤消,但让服务员看到锁被解锁的情况可能非常罕见)。
真正的实现应该有针对更多平台的特殊内容,而不仅仅是 x86。对于 x86,可能不仅仅是旋转循环内的 PAUSE
指令。这仍然只是一个完全可移植的 C11 实现的玩具示例。 PAUSE
显然有助于避免对内存排序的错误推测,因此 CPU 运行 在离开自旋循环后更有效。 pause
与将逻辑 CPU 生成到 OS 不同的线程与 运行 不同。它也与 memory_order_???
参数的正确性和选择无关。
一个真正的实现可能会在一些旋转迭代后放弃 CPU 到 OS(sched_yield(2)
,或者更可能是 futex
系统调用, 见下文)。也许使用 x86 MONITOR
/ MWAIT
对超线程更加友好;我不确定。我从来没有真正实现过锁定自己,我只是在查找其他 insn 时在 x86 insn 参考中看到所有这些东西。
如前所述,x86 的 lock xadd
指令实现了 fetch_add
(具有顺序一致性语义,因为 lock
ed 指令始终是完整的内存屏障)。在非 x86 上,仅对 fetch_add 使用 acquire+release 语义,而不是完全顺序一致性可能允许更高效的代码。我不确定,但仅使用 acquire
很可能会在 ARM64 上实现更高效的代码。我想我们只需要 acquire
on the fetch_add, not acq_rel, but I'm not sure. On x86 there won't be any difference in code, since lock
ed instructions are the only way to do atomic read-modify-write, so even relaxed
will be the same as seq_cst
(except for compile-time reordering.)
如果你想产生 CPU 而不是旋转,你需要一个系统调用(正如人们所说)。显然,为了使标准库锁定在 Linux 上尽可能高效,已经进行了大量工作。有专门的系统调用可以帮助内核在释放锁时唤醒正确的线程,它们不易于使用。 From futex(7)
:
NOTES
To reiterate, bare futexes are not intended as an easy-to-use abstraction for end-users. (There is no wrapper function for this system call in glibc.) Implementors are expected to be assembly literate and to have read the sources of the futex user-space library referenced below.
公平/饥饿(我天真的实现忽略了)
正如维基百科文章所提到的,某种唤醒队列是个好主意,这样同一个线程就不会每次都获取信号量。 (释放后快速获取锁的代码通常会让释放线程在其他线程仍在睡眠时获得锁)。
这是进程中内核协作的另一大好处(futex
)。