C++ 信号量(半 *lockfree*),我从哪里得到一个?

C++ semaphore (semi *lockfree*), where do I get one?

编辑:这不是 post() 中允许互斥锁定的任何问题的重复。请仔细阅读,我需要一个lockfree post()!如果您没有真正的答案,请不要将此标记为重复。

信号量(如 linux)是一个有用的构建块,在 c++ 标准中找不到,在 boost(当前)中也没有。我主要是通过抢占式调度程序谈论单个进程的线程之间的信号量。

我特别感兴趣的是它们是非阻塞的(即无锁),除非它确实需要阻塞。也就是说,post() 和 try_wait() 应该始终是无锁的。并且 wait() 调用应该是无锁的,如果它们的调用强烈发生在足够多的 post() 返回之后。 此外,阻塞 wait() 应该被调度程序阻塞而不是自旋锁定。 如果我还想要一个带超时的 wait_for 怎么办 - 它会使实施进一步复杂化多少,同时仍能避免饥饿?

信号量不在标准中有什么原因吗?

Edit3:所以,我不知道有一个针对标准 P0514R4 的提案可以准确地处理这些问题,并且可以解决这里提出的所有问题,除了特别添加 std::semaphore。 http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0514r4.pdf

boost 也没有这些。具体来说,进程间的是自旋锁。

哪些库支持这样的东西?

是否可以在 windows api 和其他广泛使用的系统上实现它?

编辑:不可能使用 atomics+mutex+condition_variable 实现无锁 - 你要么必须阻塞 post 要么自旋等待。如果你想要一个无锁 post(),你不能在 post() 中锁定一个互斥体。我想 运行 在一个可能抢占的调度程序上,我不希望 post() 被其他线程阻塞,这些线程占用了互斥锁并被抢占。 所以,这 不是 C++0x has no semaphores? How to synchronize threads?

这样的问题的重复

编辑2: 下面的示例实现只是为了演示使用 atomics+mutex+condvar AFAIK 可以做到的最好。 post() 和 wait() 执行一次无锁操作 compare_exchange,并且仅在必须时才锁定互斥体。

但是 post() 不是无锁的。更糟糕的是,它可能会被锁定互斥锁并被抢占的 wait() 阻塞。

为简单起见,我只实现了post_one() 和wait_one_for(Duration),而不是post(int) 和wait_for(int,Duration)。此外,我假设标准未承诺的无虚假唤醒。

class semaphore //provides acquire release memory ordering for the user
{
private:
    using mutex_t = std::mutex;
    using unique_lock_t = std::unique_lock<mutex_t>;
    using condvar_t = std::condition_variable;
    using counter_t = int;

    std::atomic<counter_t> atomic_count_; 
    mutex_t mutex_;
    condvar_t condvar_;
    counter_t posts_notified_pending_;
    counter_t posts_unnotified_pending_;
    counter_t waiters_running_;
    counter_t waiters_aborted_pending_;

public:
    void post_one()
    {
        counter_t start_count = atomic_count_.fetch_add(+1, mo_acq_rel);
        if (start_count < 0) {
            unique_lock_t lock(mutex_);
            if (0 < waiters_running_) {
                ++posts_notified_pending_;
                condvar_.notify_one();
            }
            else {
                if (0 == waiters_aborted_pending_) {
                    ++posts_unnotified_pending_;
                }
                else {
                    --waiters_aborted_pending_;
                }
            }
        }
    }

    template< typename Duration >
    bool wait_one_for(Duration timeout)
    {
        counter_t start_count = atomic_count_.fetch_add(-1, mo_acq_rel);
        if (start_count <= 0) {
            unique_lock_t a_lock(mutex_);

            ++waiters_running_;
            BOOST_SCOPE_EXIT(&waiters_running_) {
                --waiters_running_;
            } BOOST_SCOPE_EXIT_END

            if( ( 0 == posts_notified_pending_ ) && ( 0 < posts_unnotified_pending_ ) ) {
                --posts_unnotified_pending_;
                return true;
            }
            else {

                auto wait_result = condvar_.wait_for( a_lock, timeout);
                switch (wait_result) {
                case std::cv_status::no_timeout: {
                    --posts_notified_pending_;
                    return true;
                } break;
                case std::cv_status::timeout: {

                    counter_t abort_count = atomic_count_.fetch_add(+1, mo_acq_rel);
                    if (abort_count >= 0) {
                        /*too many post() already increased a negative atomic_count_ and will try to notify, let them know we aborted. */
                        ++waiters_aborted_pending_;
                    }

                    return false;
                } break;
                default: assert(false); return false;
                }
            }
        }
        return true;
    }


    bool try_wait_one()
    {
        counter_t count = atomic_count_.load( mo_acquire );
        while (true) {
            if (count <= 0) {
                return false;
            }
            else if (atomic_count_.compare_exchange_weak(count, count-1, mo_acq_rel, mo_relaxed )) {
                return true;
            }
        }
    }
};

是的,只要您的操作系统提供合适的 "park" 和 "unpark" 机制,您就可以执行此操作,该机制不需要锁定即可解锁。 Park 是指允许线程进入休眠状态(OS 阻塞),unpark 是指唤醒该线程。

您已经接近原子计数器和 condvar 方法。问题是 condvar a mutex 是 required 作为语义的一部分。所以你必须放弃 condvars 并走低一点。首先,您应该将所有状态(例如当前信号量值、是否有任何等待者(以及可能有多少等待者)打包成单个原子值,并通过比较和交换以原子方式对其进行操作。这可以防止将这些作为单独的值时发生的竞争。

然后你可以绘制一个状态图,显示信号量的所有可能状态,以及所有可能转换状态的边(例如,"no waiters" 状态将转换为 "yes waiters" 状态时服务员来了)。您使用比较和交换实现所有转换,每当它失败时您必须重新计算转换,因为它可能已经改变!

那你只需要实现拦截即可。在 Windows 上,您将使用 Events - 自动或手动重置。两者都有其优点和怪癖,给这只猫剥皮的方法不止一种。例如,您可能可以让它与 单个 共享事件和自动重置事件一起工作。

但是,这里是一种机制的草图,它在无锁队列中使用每线程等待程序对象。信号量由一个原子操作的控制字和一个无锁列表组成,元素类型为 waiter_node 或堆栈或您想要使用的任何现成的类似并发列表的东西。

我们假设每个线程都拥有一个 waiter_node 对象,该对象只包含一个手动重置事件对象。这可以创建一次并存储在 TLS 中(可能是最有效的),或者在每次需要等待时按需分配,并在等待完成时取消分配。

基本大纲如下:

等等

  • 如果信号量可用(正),CAS 将其递减并继续。
  • 如果信号量不可用(零),线程在其 waiter_node 上调用 ResetEvent,然后将事件推送到等待者列表,检查 sem 值是否仍然为零,然后在 waiter_node 上调用 WaitForObject。当 returns 时,从顶部开始等待例程。

Post

  • 增加控制字。弹出一个 waiter_node,如果有的话,然后调用 SetEvent

这里有各种各样的 "races",例如 waiter_node 在等待线程甚至休眠之前被 post 操作弹出,但它们应该是良性的。

甚至在这个基于服务员队列的设计上也有很多变体。例如,您可以整合列表 "head" 和控制词,使它们成为同一事物。然后 wait 不需要再次检查信号量计数,因为推送操作会同时验证信号量状态。您还可以实现 "direct handoff",如果有服务员,posting 线程根本不会增加控制字,而只是弹出一个并用它已成功获取信号量的信息唤醒它。

在 Linux 上,您将 Event 替换为 futex。由于 futex 允许在内核内部进行原子检查和阻止操作,因此可以更轻松地实现 "single futex" 解决方案,从而避免了 Event 解决方案中固有的许多竞争。所以一个基本的草图是一个单一的控制字,你用 CAS 原子地进行转换,然后使用 futex()FUTEX_WAIT 对控制字进行第二次检查并原子地阻止(这个原子检查-and-sleep 是 futex) 的力量。