std::atomic<std::shared_ptr<T>> 与非平凡对象的正确用法?

Correct usage of std::atomic<std::shared_ptr<T>> with non-trivial object?

我正在尝试通过 std::atomic<std::shared_ptr>> 实现无锁包装器以对容器等非平凡对象进行操作。 我在这两个主题中找到了一些相关信息:

但它仍然不是我需要的。

举个例子:

TEST_METHOD(FechAdd)
    {
        constexpr size_t loopCount = 5000000;
        auto&& container           = std::atomic<size_t>(0);
        auto thread1               = std::jthread([&]()
              {
                  for (size_t i = 0; i < loopCount; i++)
                      container++;
              });

        auto thread2 = std::jthread([&]()
            {
                for (size_t i = 0; i < loopCount; i++)
                    container++;
            });
        thread1.join();
        thread2.join();
        Assert::AreEqual(loopCount * 2, container.load());
    }

此函数可以正常工作,因为 post 增量运算符使用内部 fetch_add() 原子操作。

另一方面:

TEST_METHOD(LoadStore)
{
    constexpr size_t loopCount = 5000000;
    auto&& container           = std::atomic<size_t>(0);
    auto thread1               = std::jthread([&]()
          {
              for (size_t i = 0; i < loopCount; i++)
              {
                  auto value = container.load();
                  value++;
                  container.store(value);
              }
          });

    auto thread2 = std::jthread([&]()
        {
            for (size_t i = 0; i < loopCount; i++)
            {
                auto value = container.load();
                value++;
                container.store(value);
            }
        });
    thread1.join();
    thread2.join();
    Assert::AreEqual(loopCount * 2, container.load());
}

而如果我将其替换为 .load().store() 操作以及这两个操作之间的递增,结果是不一样的。 那是两个原子操作,所以这些操作之间不能做同步。

我的最终目标是 std::atomic<std::shared_ptr> 加载对象的实际状态,执行一些非常量操作,然后再次通过存储操作保存它。

TEST_METHOD(AtomicSharedPtr)
{
    constexpr size_t loopCount = 5000000;
    auto&& container           = std::atomic(std::make_shared<std::unordered_set<int>>());
    auto thread1               = std::jthread([&]([[maybe_unused]] std::stop_token token)
          {
              for (size_t i = 0; i < loopCount; i++)
              {
                  // some other lock-free synchronization primitives as barrier, conditions or?
                  auto reader = container.load();
                  reader->emplace(5);
                  container.store(reader);
              }
          });

    auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
        {
            for (size_t i = 0; i < loopCount; i++)
            {
                // some other lock-free synchronization primitives as barrier, conditions or?
                auto reader = container.load();
                reader->erase(5);
                container.store(reader);
            }
        });
}

我知道第二个线程也只有 shared_ptr 来自 shared_ptr、which can only cause data race.

上的原子和非常量操作

关于如何实现无锁包装器的任何提示,该包装器将与存储在 std::atomic<std::shared_ptr> 中的对象的非常量操作一起工作?

首先,旁注。 std::atomic<std::shared_ptr<T>> 提供对 指针 的原子访问,并为 T 提供 无任何同步 。在这里要注意这一点非常重要。并且您的代码显示您正在尝试同步 T,而不是指针,因此 atomic 没有按照您的想法进行。为了使用 std::atomic<std::shared_ptr<T>>,您必须将指向的 T 视为 const

有两种方法可以线程安全的方式处理任意数据的读取-修改-写入。显然,第一个是使用锁。这通常执行起来更快,并且由于其简单性,通常错误较少,因此强烈建议使用。如果你真的想用原子操作来做到这一点,那是很困难的,而且执行起来更慢。

它通常看起来像这样,您对指向的数据进行深度复制,改变副本,然后尝试用新数据替换旧数据。如果其他人在此期间更改了数据,则您将其全部丢弃并重新开始整个突变。

template<class T, class F>
bool readModifyWrite(std::atomic<std::shared_ptr<T>>& container, F&& function) {
   do {
       const auto&& oldT = container.load();
       //first a deep copy, to enforce immutability
       auto&& newT = std::make_shared(oldT.get());
       //then mutate the T
       if (!function(*newT))
           return false; //function aborted
       //then attempt to save the modified T.
       //if someone else changed the container during our modification, start over
  } while(container.compare_exchange_strong(oldT, newT) == false);
    //Note that this may take MANY tries to eventually succeed.
    return true;
}

然后用法与您的用法类似:

auto&& container           = std::atomic(std::make_shared<std::unordered_set<int>>());
auto thread1               = std::jthread([&]([[maybe_unused]] std::stop_token token)
      {
          for (size_t i = 0; i < loopCount; i++)
          {
              readModifyWrite(container, [](auto& reader) {
                 reader.emplace(5);
                 return true;
              });
          }
      });

auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
    {
        for (size_t i = 0; i < loopCount; i++)
        {
              readModifyWrite(container, [](auto& reader) {
                 reader.erase(5);
                 return true;
              });
        }
    });
}

请注意,由于一个线程正在插入 5 loopCount 次,而另一个正在擦除 5 loopCount 次,但它们之间并不同步,第一个线程可能连续写入几次(这是一个集合的空操作)然后第二个线程可能连续擦除几次(这是一个集合的空操作),所以你不确实可以保证这里的最终结果,但我假设你知道这一点。

但是,如果您想使用突变进行同步,那会变得相当复杂。变异函数必须 return 如果它成功或中止,然后 readModifyWrite 的调用者必须处理修改中止的情况。 (请注意,readModifyWrite 有效地 returns 来自函数的值,因此它 returns 来自修改步骤的值。写入步骤不影响 return 值)

auto thread1               = std::jthread([&]([[maybe_unused]] std::stop_token token)
      {
          for (size_t i = 0; i < loopCount; )
          {
              bool did_emplace = readModifyWrite(container, [](auto& reader) {
                 return reader.emplace(5);
              });
              if (did_emplace) i++;
          }
      });

auto thread2 = std::jthread([&]([[maybe_unused]] std::stop_token token)
    {
        for (size_t i = 0; i < loopCount; )
        {
              bool did_erase = readModifyWrite(container, [](auto& reader) {
                 return reader.erase(5);
              });
              if (did_erase) i++;
        }
    });
}