weak_ptr 单例不是线程安全的

weak_ptr to singleton not thread-safe

我正在编写一个 returns 一个 shared_ptr 到单例的函数。当所有引用都消失时,我希望单例对象被销毁。我的解决方案基于使用静态 weak_ptrmutexthis accepted answer,但我对其线程安全性的测试并没有始终如一地通过。

这里有一个完整的例子来说明问题。

#include <gtest/gtest.h>
#include <atomic>
#include <mutex>
#include <memory>
#include <vector>
#include <thread>

using namespace std;

// Number of instances of this class are tracked in a static counter.
// Used to verify the get_singleton logic (below) is correct.
class CountedObject {
private:
    static atomic<int> instance_counter;

public:
    CountedObject() {
        int prev_counter = instance_counter.fetch_add(1);
        if (prev_counter != 0)
            // Somehow, 2 objects exist at the same time. Why?
            throw runtime_error("Constructed " + to_string(prev_counter + 1) +
                                " counted objects");
    }
    ~CountedObject() {
        instance_counter.fetch_sub(1);
    }
    static int count() {
        return instance_counter.load();
    }
};

atomic<int> CountedObject::instance_counter{0};

// Returns reference to a singleton that gets destroyed when all references
// are destroyed.
template <typename T>
std::shared_ptr<T> get_singleton() {
    static mutex mtx;
    static weak_ptr<T> weak;
    scoped_lock lk(mtx);
    shared_ptr<T> shared = weak.lock();
    if (!shared) {
        shared.reset(new T, [](T* ptr){ 
            scoped_lock lk(mtx);
            delete ptr;
        });
        weak = shared;
    }
    return shared;
}

// This test passes consistently.
TEST(GetSingletonTest, SingleThreaded) {
    ASSERT_EQ(CountedObject::count(), 0);

    auto ref1 = get_singleton<CountedObject>();
    auto ref2 = get_singleton<CountedObject>();
    ASSERT_EQ(CountedObject::count(), 1);

    ref1.reset();
    ASSERT_EQ(CountedObject::count(), 1);
    ref2.reset();
    ASSERT_EQ(CountedObject::count(), 0);
}

// This test does NOT pass consistently.
TEST(GetSingletonTest, MultiThreaded) {
    const int THREAD_COUNT = 2;
    const int ITERS = 1000;
    vector<thread> threads;
    for (int i = 0; i < THREAD_COUNT; ++i)
        threads.emplace_back([ITERS]{
            // Repeatedly obtain and release references to the singleton.
            // The invariant must hold that at most one instance ever exists
            // at a time.
            for (int j = 0; j < ITERS; ++j) {
                auto local_ref = get_singleton<CountedObject>();
                local_ref.reset();
            }
        });
    for (auto& t : threads)
        t.join();
}

在我的系统(ARM64 Linux, g++ 7.5.0)上,多线程测试通常会失败:

[==========] Running 2 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 2 tests from GetSingletonTest
[ RUN      ] GetSingletonTest.SingleThreaded
[       OK ] GetSingletonTest.SingleThreaded (0 ms)
[ RUN      ] GetSingletonTest.MultiThreaded
terminate called after throwing an instance of 'std::runtime_error'
  what():  Constructed 2 counted objects
Aborted (core dumped)

为了简洁起见,我从代码中省略了 cout 消息,但我单独添加了消息来调试正在发生的事情:

[thread id,  message]
...
547921465808 Acquired lock
547921465808 Weak ptr expired, constructing
547921465808 Releasing lock
547929858512 Acquired lock
547929858512 Weak ptr expired, constructing
terminate called after throwing an instance of 'std::runtime_error'
  what():  Constructed 2 counted objects
Aborted (core dumped)

看来线程1判断单例已经过期,重新构造了单例。然后线程 2 唤醒并且 确定单例已经过期,即使线程 1 刚刚重新填充它 - 就好像线程 2 正在使用“过时”版本弱指针。

如何使线程 1 对 weak 的赋值立即对线程 2 可见?这可以用 C++20 的 atomic<weak_ptr<T>> 实现吗?我的项目仅限于 C++17,所以不幸的是这不是我的选择。

感谢您的帮助!

锁定下的赋值对检查weak_ptr锁定下的任何其他线程都是可见的。

存在多个实例的原因是析构函数的操作顺序不能保证。 !weak.lock() 不等同于“可以肯定,没有这个对象的实例”。相当于“肯定有这个对象的一个​​实例”的倒数。可能会有这样的例子,因为引用计数在删除者有机会采取行动之前就已经递减了。

想象一下这个序列:

  • 线程 1:~shared_ptr 被调用。
  • 线程1:强引用计数递减。
  • 线程 1:它比较等于零,删除器开始。
  • 线程 2:get_singleton 被调用。
  • 线程2:获取锁
  • 线程2:!weak.lock()满足,构造新实例
  • 线程 1:...同时,我们正在等待获取锁...
  • 线程2:构造新实例,赋值,释放锁
  • 线程1:删除者现在可以获取锁,删除原来的实例。

编辑:

为此,您需要一些指示不存在实例的指示器。只有在实例被销毁后才必须重置。其次,如果遇到实例存在但正在被销毁的情况,您需要决定如何处理:

template <typename T>
std::shared_ptr<T> get_singleton() {
    static mutex mtx;
    static weak_ptr<T> weak;
    static bool instance;

    while (true) {
        scoped_lock lk(mtx);

        shared_ptr<T> shared = weak.lock();
        if (shared)
            return shared;

        if (instance)
            // allow deleters to make progress and try again
            continue;

        instance = true;
        shared.reset(new T, [](T* ptr){ 
            scoped_lock lk(mtx);
            delete ptr;
            instance = false;
        });
        weak = shared;
        return shared;
    }
}

换句话说,状态机中有这些状态:

  1. 不存在T
  2. 正在构建T,但还不能访问
  3. T 存在并且可以访问
  4. T正在被销毁,无法再访问

bool 让我们知道我们处于状态 #4。一种方法是循环直到我们不再处于状态#4。我们放下锁,以便删除者可以取得进展。当我们重新获取锁时,另一个线程可能已经突袭并创建了实例,因此我们需要从顶部开始 re-check 一切。

非常感谢 Jeff 的解释和回答。我接受他的回答,但我想 post 一个替代实现,在我的(公认的有限的)测试中,它表现得稍微好一些。您的里程可能会有所不同。

template <typename T>
std::shared_ptr<T> get_singleton() {
    static mutex mtx;
    static weak_ptr<T> weak;
    static atomic<bool> deleted{true};
    scoped_lock lk(mtx);
    shared_ptr<T> shared = weak.lock();
    if (!shared) {
        // The refcount is 0, but is the object actually deleted yet?
        // Spin until it is.
        bool expected;
        do {
            expected = true;
        } while (!deleted.compare_exchange_weak(expected, false));

        // The previous object is definitely deleted. Ready to make a new one.
        shared.reset(new T, [](T* ptr){ 
            delete ptr;
            deleted.store(true);
        });
        weak = shared;
    }
    return shared;
}

这会从删除器中移除锁定获取,并用原子标志替换它,当对象实际被删除时该标志变为真。