在共享内存中移动 boost::interprocess::string

move boost::interprocess::string in shared memory

我想实现一些消息队列(基于向量)以某种方式处理来自网络的数据,为了做到这一点,我使用共享内存来保存消息,我遇到了一个与之相关的问题,事情是我的代码在我第一次 运行 时运行良好,当我想再次 运行 它时,当我想为共享内存中的队列中的字符串分配一个新值时,我得到了 segfaut,实际上在我的情况下,当我想移动它时(当我想复制它时存在同样的问题)。当 SSO 工作时,问题不存在,所以当我有足够小的字符串时。我做错了什么?

#include <atomic>
#include <exception>
#include <iostream>
#include <memory>
#include <string>
#include <vector>

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>

namespace bip = boost::interprocess;

struct BadSharedMemoryAccess final : public std::exception
{
    BadSharedMemoryAccess(std::string&& msg):
        msg_{std::move(msg)}
{}

virtual const char* what() const noexcept
{
    return msg_.c_str();
}

private:
    std::string msg_;
};

struct Message
{
    bip::string message_;
};

template<typename Alloc>
class MyCustomData final
{
public:
    using allocator_type = typename Alloc::template rebind<Message>::other;

    MyCustomData(std::size_t number_of_messages, Alloc alloc = {}) :
        init_add_index_{0},
        init_handle_index_{-1},
        messages_{number_of_messages, alloc}
    {}

public:
    uint_fast64_t init_add_index_;
    int_fast64_t init_handle_index_;
    std::vector<Message, Alloc> messages_;
//    bip::vector<data::Message, Alloc> messages_;
};

template<typename DataType, typename DataAllocator>
class SharedMemory
{
public:
    template<typename... Args>
    SharedMemory(std::string const& shm_segment_name, std::size_t const segment_size,
        std::string const& shm_object_name, Args&&... args) :
            shm_object_name_{shm_object_name}
    {
        std::cout << "attempt to allocate space for shared memory segment " << shm_segment_name
              << ", size: ." << segment_size << std::endl;
        setSharedMemorySize(shm_segment_name, segment_size);

        DataAllocator const allocInstance{shm_.get_segment_manager()};
        data_ = shm_.find_or_construct<DataType>(shm_object_name.c_str())(std::forward<Args>(args)..., allocInstance);
        if (data_)
            std::cout << "shared memory segment has been allocated" << std::endl;
        else
            std::cout << "shared memory has not been constructed or founded" << std::endl;
    }

    virtual ~SharedMemory()
    {
        std::cout << "shared memory segment will be closed." << std::endl;
    }

    void setSharedMemorySize(std::string const& shm_segment_name, std::size_t const segment_size)
    {
        auto page_size = bip::mapped_region::get_page_size();
        auto const page_increase_rate{2};
        while (page_size < segment_size)
        {
            page_size *= page_increase_rate;
        }

        std::cout <<"seting page size: " << page_size << std::endl;
        shm_ = bip::managed_shared_memory{bip::open_or_create, shm_segment_name.c_str(), page_size};
        std::cout << "space for shared memory has been successfully allocated." << std::endl;
    }

    DataType& getData()
    {
        if (not data_)
            throw BadSharedMemoryAccess{"cannot access " + shm_object_name_};
        return *data_;
    }

protected:
    DataType* data_;

private:
    std::string const shm_object_name_;
    bip::managed_shared_memory shm_;
};

namespace sharable
{
    using DataAllocator = bip::allocator<Message, bip::managed_shared_memory::segment_manager>;
    template<typename Alloc>
    using DataType = MyCustomData<Alloc>;
}

int main()
{
    std::size_t const max_number_of_elements_in_container{1000000};
    auto shmem_data = std::make_shared<SharedMemory<MyCustomData<sharable::DataAllocator>, sharable::DataAllocator>>(
        "SHM_SEGMENT", sizeof(MyCustomData<sharable::DataAllocator>) +
            (max_number_of_elements_in_container * sizeof(Message) * 2),
        "SHM_CONTAINER", max_number_of_elements_in_container);

    std::vector<bip::string> feed{max_number_of_elements_in_container};
    for (std::size_t i = 0; i < max_number_of_elements_in_container; ++i)
    {
        std::string s{"blablabla11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111" + std::to_string(i)};
        feed[i] = s.c_str();
    }

    auto& data = shmem_data->getData();
    auto& shmem_vec = data.messages_;
    std::cout << "addr: " << shmem_vec.data() << std::endl;
    for (std::size_t i = 0; i < max_number_of_elements_in_container; ++i)
    {
//        if (i == 0)
//            std::cout << "msg: " << shmem_vec[i].message_ << std::endl;
        auto msg = feed[i];
        shmem_vec[i].message_ = std::move(msg);
    }
    return 0;
}
  1. 您没有为字符串使用共享内存分配器。从这个意义上说 你的问题和 。 您可能想阅读它以获得一般介绍。

  2. 您的示例通过将字符串包装到您自己的字符串中使事情变得复杂 结构。这意味着你会得到很多繁琐的工作 分配器。对于“uses_allocator”方法,它与 scoped_allocator_adaptor - 可以减轻一些痛苦,例如参见 making non-shared copies of boost::interprocess shared memory objects.

  3. 看了你剩下的代码,我有点糊涂了。你为什么要模板 您的 SharedMemory 类型带有分配器?我的意思是,SharedMemory 应该是单点负责选权和传权 分配器,对吧?它如何与外部提供的分配器一起工作。

  4. 有没有用到的typedef,你为每个object新建一个segment 即使它可能来自相同的共享内存(映射相同的页面 多次进入内存)。然而你不知何故认为分享很重要 一个这样的实例的所有权 (make_shared).

  5. 尺寸计算是错误的:他们只考虑了尺寸 你的 Message 结构,而不是分配的字符串数据。你好像有 忘了映射内存也是虚拟内存。底层存储 将能够稀疏分配。那么,为什么不预留大量的 记忆,当你 运行 出来时才回应?

  6. 你在谈论和编码(一些)移动语义,但是你写:

    for (std::size_t i = 0; i < max_number_of_elements_in_container; ++i) {
        auto msg = feed[i];
        shmem_vec[i].message_ = std::move(msg);
    }
    

    这很困惑。如果你这样做有什么好处(如果有效,见下文) 无论如何先制作一个显式副本:

        auto msg = feed[i];
    
  7. 这些令人担忧的迹象:

    uint_fast64_t init_add_index_;
    int_fast64_t  init_handle_index_;
    

    看起来您可能打算同时使用多个 processes/threads²。在这种情况下,您必须添加同步或使用 atomic<> 至少打字。

总结 在我看来,你可能很难隐藏 你不小心增加了它的复杂性。

搬家

你问的是“在共享内存中移动共享字符串”。对于这部分 问题,让我们假设你实际上有你的字符串分配共享 内存。

看看移动的弦是如何工作的,不难看出移动的弦 在共享内存中 的工作方式与将它们在堆中移动完全一样 会起作用:对象地址会不同,但指向的内部指针 分配的内存将相同。

然而,代码做了其他事情:它不会移动inside shared 记忆。它试图将 移动到 共享内存。 这将 显然不安全 因为共享内存中的对象不能有用地指向 共享内存段之外的任何东西(任何其他进程都会调用 通过此类指针间接进行的未定义行为)。

像往常一样,在 C++ 中你是自己的一部分,以防止发生这样的事故:C++11 basic_string<>::swap指定

The behavior is undefined if Allocator does not propagate on swap and the allocators of *this and other are unequal.

移动构造函数is specified具有复杂性:

constant. If alloc is given and alloc != other.get_allocator(), then linear

注意当copying/moving个容器(basic_string<>是一个容器,类似于std::vector<>)分配器的语义就更复杂了:

要做什么?

总而言之,如果幸运的话,移动不会编译,因为分配器的类型不兼容并且提供了 none(例如,通过 uses_allocator 协议)。

如果你不那么幸运,它会编译但(幸运的是)不会执行移动,因为它检测到分配器“不相等”,因此它会回退到复制存储。

如果你绝对不走运,你选择了类型兼容的配置,并且分配器未配置为在容器上安全传播 move/copy,或者其他情况导致分配器无法检测到“不兼容” "¹,你最终得到了 UB。

在这种情况下,有一个更简单的选择:你知道你不能移动。因此,不要要求移动

规避风险。

一些治愈我们伤口的代码

在分解了代码和问题中的许多复杂性之后,让我们建设性地展示我们可以做些什么来解决问题:

#include <exception>
#include <iomanip>
#include <iostream>
#include <random>

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>

namespace bip = boost::interprocess;

struct BadSharedMemoryAccess final : std::runtime_error {
    BadSharedMemoryAccess(std::string msg) : std::runtime_error{ std::move(msg) } {}
};

这是前奏。现在,让我们表明我们的意图:

using Segment = bip::managed_shared_memory;
template <typename U> using Alloc = bip::allocator<U, Segment::segment_manager>;

这使得引用(并可能切换出)段及其分配器变得容易。

using Message       = bip::string;
using Feed          = bip::vector<Message>;
using SharedMessage = bip::basic_string<char, std::char_traits<char>, Alloc<char> >;
using SharedFeed    = bip::vector<SharedMessage, Alloc<SharedMessage> >;

简单地定义我们的领域实体。 通过对堆和共享分配版本使用 bip::string/bip::vector,我们获得了两者之间的最佳互操作;

class MyCustomData final {
  public:
    using allocator_type = SharedFeed::allocator_type;

    MyCustomData(std::size_t capacity, allocator_type alloc)
        : messages_(capacity, SharedMessage(alloc), alloc) // don't brace initlaize
    { }

    auto&       messages()       { return messages_; }
    auto const& messages() const { return messages_; }

  private:
    uint_fast64_t init_add_index_ = 0;
    int_fast64_t  init_handle_index_ = -1;
    SharedFeed messages_;
};

For now, dropped the virtual destructor, and the Message struct that simply wrapped a bip::string for convenience.

template <typename T> class SharedMemory final {
  public:
    template <typename... Args>
    SharedMemory(std::string const& shm_segment_name,
                 std::size_t const segment_size,
                 std::string const& shm_object_name,
                 Args&&... args)
        : shm_ { bip::open_or_create, shm_segment_name.c_str(), segment_size }
    {
        data_ = shm_.find_or_construct<T>
            (shm_object_name.c_str())
            (std::forward<Args>(args)...,
             shm_.get_segment_manager())
            ;

        if (!data_) throw BadSharedMemoryAccess {"cannot access " + shm_segment_name + "/" + shm_object_name};
    }

    T const& get() const { return *data_; }
    T&       get()       { return *data_; }

    auto free() const { return shm_.get_free_memory(); }
  protected:
    T* data_;

  private:
    Segment shm_;
};

It strikes me that SharedMemory has too many responsibilities: on the one hand it tries to be a "smart-reference" for shared objects, and on the other hand it "manages a segment". This leads to problems if you actually wanted to have multiple objects in a segment. Consider splitting into Shared::Segment and Shared::Object<T>.

Feed generate_heap_feed(size_t n) {
    Feed feed;
    feed.reserve(n);
    for (size_t i = 0; i < n ; ++i) {
        feed.emplace_back(
            "blablabla11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"
            + std::to_string(i));
    }
    return feed;
}

main 中提取了测试源生成器。

int main() {
    static constexpr std::size_t capacity { 1000000 };
    static constexpr auto estimate = 300ull << 20; // 300 MiB (<< 10 kilo, << 20 mebi, << 30 gibi)

用慷慨的估计替换了错误的计算³。请参阅下面的测量值。

    using SharedData = SharedMemory<MyCustomData>;
    SharedData shmem_data("SHM_SEGMENT", estimate, "SHM_CONTAINER", capacity);
    std::cout << "Free: " << shmem_data.free() << "\n";

漂亮且可读。在我的系统上首先打印 "Free: 282572448" 运行.

    Feed const feed      = generate_heap_feed(capacity);
    SharedFeed& shm_feed = shmem_data.get().messages();

现在我们有并排的提要,让我们复制:

    // copy feed from heap to shm
    auto const n = std::min(feed.size(), shm_feed.size());
    std::copy_n(feed.begin(), n, shm_feed.begin());

    std::cout << "Copied: " << n << "\n";
    std::cout << "Free: " << shmem_data.free() << "\n";

就是这样。我们不尝试移动,因为我们知道那行不通。 bip::basic_string 正确知道如何在不兼容的分配器之间进行复制。没有汗水。

为了更好地衡量,让我们打印一些诊断信息:

    {
        // check some random samples
        std::default_random_engine prng{std::random_device{}()};
        auto pick = [&] { return std::uniform_int_distribution<>(0, n-1)(prng); };

        for (auto index : {pick(), pick(), pick(), pick()}) {
            std::string_view a = feed.at(index);
            std::string_view b = shm_feed.at(index);
            std::cout << "Message #" << index
                << (a == b? " OK":" FAIL")
                << " " << std::quoted(b) << std::endl;
        }
    }
}

看到了Live On Coliru⁴

打印,例如:

Especially note the filesize measurements (--apparent-size vs. the size on disk). This confirms my point about sparse allocation. Even if you reserved 100TB, the effective size of the SHM_CONTAINER would still be 182MiB.

奖金部分

作用域分配器适配器

只需替换一行:

template <typename U> using Alloc = bip::allocator<U, Segment::segment_manager>;

template <typename U> using Alloc = boost::container::scoped_allocator_adaptor<
    bip::allocator<U, Segment::segment_manager> >;

可以解决问题,解锁神奇的分配器传播,例如从向量到 构造其元素时的字符串(使用 emplaceassign)。所以我们可以 进一步简化 copy_n 来自:

// copy feed from heap to shm
auto const n = std::min(feed.size(), shm_feed.size());
std::copy_n(feed.begin(), n, shm_feed.begin());

std::cout << "Copied: " << n << "\n";

简单地说:

shm_feed.assign(feed.begin(), feed.end());
std::cout << "Copied: " << shm_feed.size() << "\n";

它具有与以前完全相同的分配行为。 也看到了Live On Coliru

多态分配器 (c++17)

这不会从根本上改变任何事情,除了:

  • 它会使 Feed/SharedFeed 和 Message/SharedMessage 共享相同的静态类型
  • 默认情况下它会像以前一样具有作用域分配器行为

然而,在我们在标准中获得对奇特指针的适当支持之前,这是一个白日梦:

再次创建 Message 结构?

嗯。更像是“再次斗争”。我承认我讨厌编写可识别分配器的数据类型。这无疑不是最优的,但这是我可以做的最少的事情:

template <typename Alloc>
struct BasicMessage {
    // pre-c++17:
    //  using allocator_type = typename Alloc::template rebind<char>::other;
    using allocator_type = typename std::allocator_traits<Alloc>::template rebind_alloc<char>;

    BasicMessage(std::allocator_arg_t, allocator_type alloc)
        : _msg(alloc) { }

    template <typename T1, typename... T,
             typename = std::enable_if_t<
                    not std::is_same_v<std::allocator_arg_t, std::decay_t<T1> >
                 >
        >
    explicit BasicMessage(T1&& a, T&&... init)
        : _msg(std::forward<T1>(a), std::forward<T>(init)...) { }

    template <typename OtherAlloc>
    BasicMessage(BasicMessage<OtherAlloc> const& other, allocator_type alloc)
        : _msg(other.message().begin(), other.message().end(), alloc) { }

    template <typename OtherAlloc, typename OM = BasicMessage<OtherAlloc> >
    std::enable_if_t<
        not std::is_same_v<allocator_type, typename OM::allocator_type>,
        BasicMessage&>
    operator=(BasicMessage<OtherAlloc> const& other) {
        _msg.assign(other.message().begin(), other.message().end());
        return *this;
    }

    template <typename OtherAlloc>
    BasicMessage(std::allocator_arg_t, allocator_type alloc, BasicMessage<OtherAlloc> const& other)
        : _msg(other.message().begin(), other.message().end(), alloc) { }

    BasicMessage(BasicMessage const&) = default;
    BasicMessage(BasicMessage&&) = default;
    BasicMessage& operator=(BasicMessage const&) = default;
    BasicMessage& operator=(BasicMessage&&) = default;

    auto& message() const { return _msg; }
    auto& message()       { return _msg; }
  private:
    bip::basic_string<char, std::char_traits<char>, allocator_type> _msg;
};

using Message       = BasicMessage<std::allocator<char> >;
using Feed          = bip::vector<Message>;
using SharedMessage = BasicMessage<Alloc<char> >;
using SharedFeed    = bip::vector<SharedMessage, Alloc<SharedMessage> >;

从好的方面来说,由于 scoped_allocator_adaptor 上面介绍的修复。也许如果那不是我们想要的, 你可以少一点复杂性。

在其他地方进行了细微的界面更改:

: messages_(capacity, SharedMessage(std::allocator_arg, alloc), alloc) // don't brace initlaize

    std::string_view a = feed.at(index).message();
    std::string_view b = shm_feed.at(index).message();

一切仍然有效,参见Live On Coliru


¹ 不是标准语,因此是引号

² 我怀疑您可能正在尝试实施 Disruptor 模式

³ 见

⁴ 将 managed_shared_memory 替换为 manage_mapped_file 并由于 Coliru 限制

减少了容量