std::atomic<std::shared_ptr<T>> 与非平凡对象的正确用法?
Correct usage of std::atomic<std::shared_ptr<T>> with non-trivial object?
我正在尝试通过 std::atomic<std::shared_ptr>>
实现无锁包装器以对容器等非平凡对象进行操作。
我在这两个主题中找到了一些相关信息:
- atomic usage
但它仍然不是我需要的。
举个例子:
TEST_METHOD(FechAdd)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
container++;
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
container++;
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
此函数可以正常工作,因为 post 增量运算符使用内部 fetch_add()
原子操作。
另一方面:
TEST_METHOD(LoadStore)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
{
auto value = container.load();
value++;
container.store(value);
}
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
{
auto value = container.load();
value++;
container.store(value);
}
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
而如果我将其替换为 .load()
和 .store()
操作以及这两个操作之间的递增,结果是不一样的。
那是两个原子操作,所以这些操作之间不能做同步。
我的最终目标是 std::atomic<std::shared_ptr>
加载对象的实际状态,执行一些非常量操作,然后再次通过存储操作保存它。
TEST_METHOD(AtomicSharedPtr)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->emplace(5);
container.store(reader);
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->erase(5);
container.store(reader);
}
});
}
我知道第二个线程也只有 shared_ptr 来自 shared_ptr、which can only cause data race.
上的原子和非常量操作
关于如何实现无锁包装器的任何提示,该包装器将与存储在 std::atomic<std::shared_ptr>
中的对象的非常量操作一起工作?
首先,旁注。 std::atomic<std::shared_ptr<T>>
提供对 指针 的原子访问,并为 T
提供 无任何同步 。在这里要注意这一点非常重要。并且您的代码显示您正在尝试同步 T
,而不是指针,因此 atomic
没有按照您的想法进行。为了使用 std::atomic<std::shared_ptr<T>>
,您必须将指向的 T
视为 const
。
有两种方法可以线程安全的方式处理任意数据的读取-修改-写入。显然,第一个是使用锁。这通常执行起来更快,并且由于其简单性,通常错误较少,因此强烈建议使用。如果你真的想用原子操作来做到这一点,那是很困难的,而且执行起来更慢。
它通常看起来像这样,您对指向的数据进行深度复制,改变副本,然后尝试用新数据替换旧数据。如果其他人在此期间更改了数据,则您将其全部丢弃并重新开始整个突变。
template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
do {
const auto&& oldT = container.load();
//first a deep copy, to enforce immutability
auto&& newT = std::make_shared(oldT.get());
//then mutate the T
if (!function(*newT))
return false; //function aborted
//then attempt to save the modified T.
//if someone else changed the container during our modification, start over
} while(container.compare_exchange_strong(oldT, newT) == false);
//Note that this may take MANY tries to eventually succeed.
return true;
}
然后用法与您的用法类似:
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
readModifyWrite(container, [](auto& reader) {
reader.emplace(5);
return true;
});
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
readModifyWrite(container, [](auto& reader) {
reader.erase(5);
return true;
});
}
});
}
请注意,由于一个线程正在插入 5
loopCount
次,而另一个正在擦除 5
loopCount
次,但它们之间并不同步,第一个线程可能连续写入几次(这是一个集合的空操作)然后第二个线程可能连续擦除几次(这是一个集合的空操作),所以你不确实可以保证这里的最终结果,但我假设你知道这一点。
但是,如果您想使用突变进行同步,那会变得相当复杂。变异函数必须 return 如果它成功或中止,然后 readModifyWrite
的调用者必须处理修改中止的情况。 (请注意,readModifyWrite
有效地 returns 来自函数的值,因此它 returns 来自修改步骤的值。写入步骤不影响 return 值)
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_emplace = readModifyWrite(container, [](auto& reader) {
return reader.emplace(5);
});
if (did_emplace) i++;
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_erase = readModifyWrite(container, [](auto& reader) {
return reader.erase(5);
});
if (did_erase) i++;
}
});
}
我正在尝试通过 std::atomic<std::shared_ptr>>
实现无锁包装器以对容器等非平凡对象进行操作。
我在这两个主题中找到了一些相关信息:
- atomic usage
但它仍然不是我需要的。
举个例子:
TEST_METHOD(FechAdd)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
container++;
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
container++;
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
此函数可以正常工作,因为 post 增量运算符使用内部 fetch_add()
原子操作。
另一方面:
TEST_METHOD(LoadStore)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic<size_t>(0);
auto thread1 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
{
auto value = container.load();
value++;
container.store(value);
}
});
auto thread2 = std::jthread([&]()
{
for (size_t i = 0; i < loopCount; i++)
{
auto value = container.load();
value++;
container.store(value);
}
});
thread1.join();
thread2.join();
Assert::AreEqual(loopCount * 2, container.load());
}
而如果我将其替换为 .load()
和 .store()
操作以及这两个操作之间的递增,结果是不一样的。
那是两个原子操作,所以这些操作之间不能做同步。
我的最终目标是 std::atomic<std::shared_ptr>
加载对象的实际状态,执行一些非常量操作,然后再次通过存储操作保存它。
TEST_METHOD(AtomicSharedPtr)
{
constexpr size_t loopCount = 5000000;
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->emplace(5);
container.store(reader);
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
// some other lock-free synchronization primitives as barrier, conditions or?
auto reader = container.load();
reader->erase(5);
container.store(reader);
}
});
}
我知道第二个线程也只有 shared_ptr 来自 shared_ptr、which can only cause data race.
上的原子和非常量操作关于如何实现无锁包装器的任何提示,该包装器将与存储在 std::atomic<std::shared_ptr>
中的对象的非常量操作一起工作?
首先,旁注。 std::atomic<std::shared_ptr<T>>
提供对 指针 的原子访问,并为 T
提供 无任何同步 。在这里要注意这一点非常重要。并且您的代码显示您正在尝试同步 T
,而不是指针,因此 atomic
没有按照您的想法进行。为了使用 std::atomic<std::shared_ptr<T>>
,您必须将指向的 T
视为 const
。
有两种方法可以线程安全的方式处理任意数据的读取-修改-写入。显然,第一个是使用锁。这通常执行起来更快,并且由于其简单性,通常错误较少,因此强烈建议使用。如果你真的想用原子操作来做到这一点,那是很困难的,而且执行起来更慢。
它通常看起来像这样,您对指向的数据进行深度复制,改变副本,然后尝试用新数据替换旧数据。如果其他人在此期间更改了数据,则您将其全部丢弃并重新开始整个突变。
template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
do {
const auto&& oldT = container.load();
//first a deep copy, to enforce immutability
auto&& newT = std::make_shared(oldT.get());
//then mutate the T
if (!function(*newT))
return false; //function aborted
//then attempt to save the modified T.
//if someone else changed the container during our modification, start over
} while(container.compare_exchange_strong(oldT, newT) == false);
//Note that this may take MANY tries to eventually succeed.
return true;
}
然后用法与您的用法类似:
auto&& container = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
readModifyWrite(container, [](auto& reader) {
reader.emplace(5);
return true;
});
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; i++)
{
readModifyWrite(container, [](auto& reader) {
reader.erase(5);
return true;
});
}
});
}
请注意,由于一个线程正在插入 5
loopCount
次,而另一个正在擦除 5
loopCount
次,但它们之间并不同步,第一个线程可能连续写入几次(这是一个集合的空操作)然后第二个线程可能连续擦除几次(这是一个集合的空操作),所以你不确实可以保证这里的最终结果,但我假设你知道这一点。
但是,如果您想使用突变进行同步,那会变得相当复杂。变异函数必须 return 如果它成功或中止,然后 readModifyWrite
的调用者必须处理修改中止的情况。 (请注意,readModifyWrite
有效地 returns 来自函数的值,因此它 returns 来自修改步骤的值。写入步骤不影响 return 值)
auto thread1 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_emplace = readModifyWrite(container, [](auto& reader) {
return reader.emplace(5);
});
if (did_emplace) i++;
}
});
auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
{
for (size_t i = 0; i < loopCount; )
{
bool did_erase = readModifyWrite(container, [](auto& reader) {
return reader.erase(5);
});
if (did_erase) i++;
}
});
}