C++:Signal/Slot 库中的线程安全

C++: Thread Safety in a Signal/Slot Library

我正在实现一个 Signal/Slot 框架,并且我希望它是线程安全的。我已经从 Boost 邮件列表中获得了很多支持,但由于这与 boost 无关,我将在这里提出我的悬而未决的问题。

什么时候 signal/slot 实现(或调用自身外部函数的任何框架,由用户以某种方式指定)被认为是线程安全的?应该是安全的w.r.t。它自己的数据,即与其实现细节相关的数据?或者它是否也应该考虑用户的数据,无论传递给框架的函数是什么,这些数据都可能会被修改,也可能不会被修改?

这是邮件列表中给出的示例(编辑:这是一个示例用例——即用户代码——。我的代码在 Emitter 对象):

int * somePtr = nullptr;
Emitter<Event> em; // just an object that can emit the 'Event' signal    

void mainThread()
{
    em.connect<Event>(someFunction);

    // now, somehow, 2 threads are created which, at some point
    // execute the thread1() and thread2() functions below
}

void someFunction()
{
    // can somePtr change after the check but before the set?
    if (somePtr)
        *somePtr = 17;
}

void cleanupPtr()
{
    // this looks safe, but compilers and CPUs can reorder this code:
    int *tmp = somePtr;
    somePtr = null;
    delete tmp;
}

void thread1()
{
    em.emit<Event>();
}

void thread2()
{
    em.disconnect<Event>(someFunction);
    // now safe to cleanup (?)
    cleanupPtr();
}

在上面的代码中,可能会发出Event,导致someFunction被执行。如果 somePtr 不是 null,但在 if 之后变成 null,但在赋值之前,我们就有麻烦了。从thread2的角度来看,这并不明显,因为它在调用cleanupPtr之前断开someFunction

我明白为什么这可能会导致麻烦,但这是谁的责任?我的图书馆是否应该保护用户不以各种不负责任但可以想象的方式使用它?

最后一题很简单。如果你说你的库是线程安全的,它应该是线程安全的。说它部分是线程安全的是没有意义的,或者,如果你不滥用它,它只是线程安全的。在那种情况下,您必须解释到底什么不是线程安全的。

现在回答您认为 someFunction 的第一个问题: 该操作是非原子的。这意味着 CPU 可以在 ifassigment 之间中断。那将会发生,我知道 :-) 另一个线程可以随时擦除指针。即使在两个简短而快速的陈述之间。

现在 cleanupPtr: 我不是编译器专家,但如果你想确保你的赋值发生在你用代码编写它的同一时刻,你应该在 somePtr 的声明前面写关键字 volatile。编译器现在将知道您在多线程情况下使用该属性,并且不会在 CPU 的寄存器中缓冲该值。

如果您的线程情况包含 reader 线程和编写器线程,关键字 volatile 可以(恕我直言)足以同步它们。只要您用来在线程之间交换信息的属性是通用的。 对于其他情况,您可以使用互斥或​​原子。我会给你一个互斥锁的例子。我为此使用 C++11,但它与使用 boost 的以前版本的 C++ 的工作方式类似。

使用互斥:

int * somePtr = nullptr;
Emitter<Event> em; // just an object that can emit the 'Event' signal    
std::recursive_mutex g_mutex;

void mainThread()
{
    em.connect<Event>(someFunction);

    // now, somehow, 2 threads are created which, at some point
    // execute the thread1() and thread2() functions below
}

void someFunction()
{
    std::lock_guard<std::recursive_mutex> lock(g_mutex);
    // can somePtr change after the check but before the set?
    if (somePtr)
        *somePtr = 17;
}

void cleanupPtr()
{
    std::lock_guard<std::recursive_mutex> lock(g_mutex);
    // this looks safe, but compilers and CPUs can reorder this code:
    int *tmp = somePtr;
    somePtr = null;
    delete tmp;
}

void thread1()
{
    em.emit<Event>();
}

void thread2()
{
    em.disconnect<Event>(someFunction);
    // now safe to cleanup (?)
    cleanupPtr();
}

我这里只添加了一个递归互斥锁,没有改变样本的任何其他代码,即使它现在是货物代码。 std 中有两种互斥量。完全无用的 std::mutexstd::recursive_mutex 就像您期望的互斥锁一样工作。 std::mutex 排除任何进一步调用的访问,即使是来自同一线程。如果需要互斥锁保护的方法调用使用相同互斥锁的 public 方法,就会发生这种情况。 std::recursive_mutex 对于同一个线程是可重入的。 原子(或 win32 中的互锁)是另一种方式,但仅用于在线程之间交换值或并发访问它们。您的示例缺少此类值,但在您的情况下,我会更深入地了解它们 (std::atomic)。

更新

如果您是开发人员未明确声明为线程安全的库的用户,请将其视为非线程安全并使用互斥锁屏蔽对它的每次调用。 坚持这个例子。如果你不能改变 someFunction 你必须像这样包装函数:

void threadsafeSomeFunction()
{
  std::lock_guard<std::recursive_mutex> lock(g_mutex);
  someFunction();
}

我怀疑没有明确的好答案,但是通过记录您希望对 Emitter 对象的并发访问做出的保证会更加清晰。

一级保证,对我来说是线程安全承诺所隐含的,是:

  • 对象上的并发操作保证使对象处于一致状态(至少,从访问线程的角度来看是这样。)
  • 非交换操作将按照某种(未知)顺序连续安排的方式执行。

那么问题是,emit 方法在语义上承诺的是什么:将控制权传递给连接的例程,还是对函数求值?如果是前者,那么你的工作听起来已经完成了;如果是后者,那么 'as-if ordered' 要求意味着您需要强制执行某种程度的同步。

图书馆的用户可以使用任何一种,只要明确承诺的内容即可。

首先是最简单的可能性:如果您不声明您的库是线程安全的,则不必为此操心。

(但即使)如果你这样做: 在您的示例中,用户必须注意线程安全,因为即使不使用您的事件系统,这两个函数也可能很危险(恕我直言,这是确定谁应该注意此类问题的好方法) .他在 C++11 中执行此操作的可能方法是:

#include <mutex>

// A mutex is used to control thread-acess to a shared resource
std::mutex _somePtr_mutex;

int* somePtr = nullptr;

void someFunction()
{
    /*
        Create a 'lock_guard' to manage your mutex.

        Is the mutex '_somePtr_mutex' already locked?
            Yes: Wait until it's unlocked.
            No: Lock it and continue execution.
    */
    std::lock_guard<std::mutex> lock(_somePtr_mutex);

    if(somePtr)
        *somePtr = 17;

    // End of scope: 'lock' gets destroyed and hence unlocks '_somePtr_mutex'
}

void cleanupPtr()
{
    /*
        Create a 'lock_guard' to manage your mutex.

        Is the mutex '_somePtr_mutex' already locked?
            Yes: Wait until it's unlocked.
            No: Lock it and continue execution.
    */
    std::lock_guard<std::mutex> lock(_somePtr_mutex);

    int *tmp = somePtr;
    somePtr = null;
    delete tmp;

    // End of scope: 'lock' gets destroyed and hence unlocks '_somePtr_mutex'
}