weak_ptr 单例不是线程安全的
weak_ptr to singleton not thread-safe
我正在编写一个 returns 一个 shared_ptr 到单例的函数。当所有引用都消失时,我希望单例对象被销毁。我的解决方案基于使用静态 weak_ptr
和 mutex
的 this accepted answer,但我对其线程安全性的测试并没有始终如一地通过。
这里有一个完整的例子来说明问题。
#include <gtest/gtest.h>
#include <atomic>
#include <mutex>
#include <memory>
#include <vector>
#include <thread>
using namespace std;
// Number of instances of this class are tracked in a static counter.
// Used to verify the get_singleton logic (below) is correct.
class CountedObject {
private:
static atomic<int> instance_counter;
public:
CountedObject() {
int prev_counter = instance_counter.fetch_add(1);
if (prev_counter != 0)
// Somehow, 2 objects exist at the same time. Why?
throw runtime_error("Constructed " + to_string(prev_counter + 1) +
" counted objects");
}
~CountedObject() {
instance_counter.fetch_sub(1);
}
static int count() {
return instance_counter.load();
}
};
atomic<int> CountedObject::instance_counter{0};
// Returns reference to a singleton that gets destroyed when all references
// are destroyed.
template <typename T>
std::shared_ptr<T> get_singleton() {
static mutex mtx;
static weak_ptr<T> weak;
scoped_lock lk(mtx);
shared_ptr<T> shared = weak.lock();
if (!shared) {
shared.reset(new T, [](T* ptr){
scoped_lock lk(mtx);
delete ptr;
});
weak = shared;
}
return shared;
}
// This test passes consistently.
TEST(GetSingletonTest, SingleThreaded) {
ASSERT_EQ(CountedObject::count(), 0);
auto ref1 = get_singleton<CountedObject>();
auto ref2 = get_singleton<CountedObject>();
ASSERT_EQ(CountedObject::count(), 1);
ref1.reset();
ASSERT_EQ(CountedObject::count(), 1);
ref2.reset();
ASSERT_EQ(CountedObject::count(), 0);
}
// This test does NOT pass consistently.
TEST(GetSingletonTest, MultiThreaded) {
const int THREAD_COUNT = 2;
const int ITERS = 1000;
vector<thread> threads;
for (int i = 0; i < THREAD_COUNT; ++i)
threads.emplace_back([ITERS]{
// Repeatedly obtain and release references to the singleton.
// The invariant must hold that at most one instance ever exists
// at a time.
for (int j = 0; j < ITERS; ++j) {
auto local_ref = get_singleton<CountedObject>();
local_ref.reset();
}
});
for (auto& t : threads)
t.join();
}
在我的系统(ARM64 Linux, g++ 7.5.0)上,多线程测试通常会失败:
[==========] Running 2 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 2 tests from GetSingletonTest
[ RUN ] GetSingletonTest.SingleThreaded
[ OK ] GetSingletonTest.SingleThreaded (0 ms)
[ RUN ] GetSingletonTest.MultiThreaded
terminate called after throwing an instance of 'std::runtime_error'
what(): Constructed 2 counted objects
Aborted (core dumped)
为了简洁起见,我从代码中省略了 cout
消息,但我单独添加了消息来调试正在发生的事情:
[thread id, message]
...
547921465808 Acquired lock
547921465808 Weak ptr expired, constructing
547921465808 Releasing lock
547929858512 Acquired lock
547929858512 Weak ptr expired, constructing
terminate called after throwing an instance of 'std::runtime_error'
what(): Constructed 2 counted objects
Aborted (core dumped)
看来线程1判断单例已经过期,重新构造了单例。然后线程 2 唤醒并且 也 确定单例已经过期,即使线程 1 刚刚重新填充它 - 就好像线程 2 正在使用“过时”版本弱指针。
如何使线程 1 对 weak
的赋值立即对线程 2 可见?这可以用 C++20 的 atomic<weak_ptr<T>>
实现吗?我的项目仅限于 C++17,所以不幸的是这不是我的选择。
感谢您的帮助!
锁定下的赋值对检查weak_ptr
锁定下的任何其他线程都是可见的。
存在多个实例的原因是析构函数的操作顺序不能保证。 !weak.lock()
不等同于“可以肯定,没有这个对象的实例”。相当于“肯定有这个对象的一个实例”的倒数。可能会有这样的例子,因为引用计数在删除者有机会采取行动之前就已经递减了。
想象一下这个序列:
- 线程 1:
~shared_ptr
被调用。
- 线程1:强引用计数递减。
- 线程 1:它比较等于零,删除器开始。
- 线程 2:
get_singleton
被调用。
- 线程2:获取锁
- 线程2:
!weak.lock()
满足,构造新实例
- 线程 1:...同时,我们正在等待获取锁...
- 线程2:构造新实例,赋值,释放锁
- 线程1:删除者现在可以获取锁,删除原来的实例。
编辑:
为此,您需要一些指示不存在实例的指示器。只有在实例被销毁后才必须重置。其次,如果遇到实例存在但正在被销毁的情况,您需要决定如何处理:
template <typename T>
std::shared_ptr<T> get_singleton() {
static mutex mtx;
static weak_ptr<T> weak;
static bool instance;
while (true) {
scoped_lock lk(mtx);
shared_ptr<T> shared = weak.lock();
if (shared)
return shared;
if (instance)
// allow deleters to make progress and try again
continue;
instance = true;
shared.reset(new T, [](T* ptr){
scoped_lock lk(mtx);
delete ptr;
instance = false;
});
weak = shared;
return shared;
}
}
换句话说,状态机中有这些状态:
- 不存在T
- 正在构建T,但还不能访问
- T 存在并且可以访问
- T正在被销毁,无法再访问
bool 让我们知道我们处于状态 #4。一种方法是循环直到我们不再处于状态#4。我们放下锁,以便删除者可以取得进展。当我们重新获取锁时,另一个线程可能已经突袭并创建了实例,因此我们需要从顶部开始 re-check 一切。
非常感谢 Jeff 的解释和回答。我接受他的回答,但我想 post 一个替代实现,在我的(公认的有限的)测试中,它表现得稍微好一些。您的里程可能会有所不同。
template <typename T>
std::shared_ptr<T> get_singleton() {
static mutex mtx;
static weak_ptr<T> weak;
static atomic<bool> deleted{true};
scoped_lock lk(mtx);
shared_ptr<T> shared = weak.lock();
if (!shared) {
// The refcount is 0, but is the object actually deleted yet?
// Spin until it is.
bool expected;
do {
expected = true;
} while (!deleted.compare_exchange_weak(expected, false));
// The previous object is definitely deleted. Ready to make a new one.
shared.reset(new T, [](T* ptr){
delete ptr;
deleted.store(true);
});
weak = shared;
}
return shared;
}
这会从删除器中移除锁定获取,并用原子标志替换它,当对象实际被删除时该标志变为真。
我正在编写一个 returns 一个 shared_ptr 到单例的函数。当所有引用都消失时,我希望单例对象被销毁。我的解决方案基于使用静态 weak_ptr
和 mutex
的 this accepted answer,但我对其线程安全性的测试并没有始终如一地通过。
这里有一个完整的例子来说明问题。
#include <gtest/gtest.h>
#include <atomic>
#include <mutex>
#include <memory>
#include <vector>
#include <thread>
using namespace std;
// Number of instances of this class are tracked in a static counter.
// Used to verify the get_singleton logic (below) is correct.
class CountedObject {
private:
static atomic<int> instance_counter;
public:
CountedObject() {
int prev_counter = instance_counter.fetch_add(1);
if (prev_counter != 0)
// Somehow, 2 objects exist at the same time. Why?
throw runtime_error("Constructed " + to_string(prev_counter + 1) +
" counted objects");
}
~CountedObject() {
instance_counter.fetch_sub(1);
}
static int count() {
return instance_counter.load();
}
};
atomic<int> CountedObject::instance_counter{0};
// Returns reference to a singleton that gets destroyed when all references
// are destroyed.
template <typename T>
std::shared_ptr<T> get_singleton() {
static mutex mtx;
static weak_ptr<T> weak;
scoped_lock lk(mtx);
shared_ptr<T> shared = weak.lock();
if (!shared) {
shared.reset(new T, [](T* ptr){
scoped_lock lk(mtx);
delete ptr;
});
weak = shared;
}
return shared;
}
// This test passes consistently.
TEST(GetSingletonTest, SingleThreaded) {
ASSERT_EQ(CountedObject::count(), 0);
auto ref1 = get_singleton<CountedObject>();
auto ref2 = get_singleton<CountedObject>();
ASSERT_EQ(CountedObject::count(), 1);
ref1.reset();
ASSERT_EQ(CountedObject::count(), 1);
ref2.reset();
ASSERT_EQ(CountedObject::count(), 0);
}
// This test does NOT pass consistently.
TEST(GetSingletonTest, MultiThreaded) {
const int THREAD_COUNT = 2;
const int ITERS = 1000;
vector<thread> threads;
for (int i = 0; i < THREAD_COUNT; ++i)
threads.emplace_back([ITERS]{
// Repeatedly obtain and release references to the singleton.
// The invariant must hold that at most one instance ever exists
// at a time.
for (int j = 0; j < ITERS; ++j) {
auto local_ref = get_singleton<CountedObject>();
local_ref.reset();
}
});
for (auto& t : threads)
t.join();
}
在我的系统(ARM64 Linux, g++ 7.5.0)上,多线程测试通常会失败:
[==========] Running 2 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 2 tests from GetSingletonTest
[ RUN ] GetSingletonTest.SingleThreaded
[ OK ] GetSingletonTest.SingleThreaded (0 ms)
[ RUN ] GetSingletonTest.MultiThreaded
terminate called after throwing an instance of 'std::runtime_error'
what(): Constructed 2 counted objects
Aborted (core dumped)
为了简洁起见,我从代码中省略了 cout
消息,但我单独添加了消息来调试正在发生的事情:
[thread id, message]
...
547921465808 Acquired lock
547921465808 Weak ptr expired, constructing
547921465808 Releasing lock
547929858512 Acquired lock
547929858512 Weak ptr expired, constructing
terminate called after throwing an instance of 'std::runtime_error'
what(): Constructed 2 counted objects
Aborted (core dumped)
看来线程1判断单例已经过期,重新构造了单例。然后线程 2 唤醒并且 也 确定单例已经过期,即使线程 1 刚刚重新填充它 - 就好像线程 2 正在使用“过时”版本弱指针。
如何使线程 1 对 weak
的赋值立即对线程 2 可见?这可以用 C++20 的 atomic<weak_ptr<T>>
实现吗?我的项目仅限于 C++17,所以不幸的是这不是我的选择。
感谢您的帮助!
锁定下的赋值对检查weak_ptr
锁定下的任何其他线程都是可见的。
存在多个实例的原因是析构函数的操作顺序不能保证。 !weak.lock()
不等同于“可以肯定,没有这个对象的实例”。相当于“肯定有这个对象的一个实例”的倒数。可能会有这样的例子,因为引用计数在删除者有机会采取行动之前就已经递减了。
想象一下这个序列:
- 线程 1:
~shared_ptr
被调用。 - 线程1:强引用计数递减。
- 线程 1:它比较等于零,删除器开始。
- 线程 2:
get_singleton
被调用。 - 线程2:获取锁
- 线程2:
!weak.lock()
满足,构造新实例 - 线程 1:...同时,我们正在等待获取锁...
- 线程2:构造新实例,赋值,释放锁
- 线程1:删除者现在可以获取锁,删除原来的实例。
编辑:
为此,您需要一些指示不存在实例的指示器。只有在实例被销毁后才必须重置。其次,如果遇到实例存在但正在被销毁的情况,您需要决定如何处理:
template <typename T>
std::shared_ptr<T> get_singleton() {
static mutex mtx;
static weak_ptr<T> weak;
static bool instance;
while (true) {
scoped_lock lk(mtx);
shared_ptr<T> shared = weak.lock();
if (shared)
return shared;
if (instance)
// allow deleters to make progress and try again
continue;
instance = true;
shared.reset(new T, [](T* ptr){
scoped_lock lk(mtx);
delete ptr;
instance = false;
});
weak = shared;
return shared;
}
}
换句话说,状态机中有这些状态:
- 不存在T
- 正在构建T,但还不能访问
- T 存在并且可以访问
- T正在被销毁,无法再访问
bool 让我们知道我们处于状态 #4。一种方法是循环直到我们不再处于状态#4。我们放下锁,以便删除者可以取得进展。当我们重新获取锁时,另一个线程可能已经突袭并创建了实例,因此我们需要从顶部开始 re-check 一切。
非常感谢 Jeff 的解释和回答。我接受他的回答,但我想 post 一个替代实现,在我的(公认的有限的)测试中,它表现得稍微好一些。您的里程可能会有所不同。
template <typename T>
std::shared_ptr<T> get_singleton() {
static mutex mtx;
static weak_ptr<T> weak;
static atomic<bool> deleted{true};
scoped_lock lk(mtx);
shared_ptr<T> shared = weak.lock();
if (!shared) {
// The refcount is 0, but is the object actually deleted yet?
// Spin until it is.
bool expected;
do {
expected = true;
} while (!deleted.compare_exchange_weak(expected, false));
// The previous object is definitely deleted. Ready to make a new one.
shared.reset(new T, [](T* ptr){
delete ptr;
deleted.store(true);
});
weak = shared;
}
return shared;
}
这会从删除器中移除锁定获取,并用原子标志替换它,当对象实际被删除时该标志变为真。