创建一个在 boost 中使用多个 managed_shared_memory 段的分配器

Creating an allocator that uses multiple managed_shared_memory segments in boost

为了解决 managed_shared_memory segment without unmapping and remapping all the previous regions, I want to create an allocator that creates a new managed_shared_memory segment whenever there is not enough space in the previous segments. I have looked into the boost interprocess node allocators 的增长问题,但它们似乎不太适合解决这个问题。 boost 中是否有任何 class 或实用程序可以帮助解决此问题?

不是问题的直接匹配,而是评论线程的相关成果,这里我举一个例子,使用 managed_external_buffer 来实现对 on-disk 格式的更多控制(预见向后兼容的版本控制也许还有一些完整性验证)并展示了如何实现增长

Live On Compiler Explorer

#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/managed_external_buffer.hpp>
#include <boost/interprocess/mapped_region.hpp>

// sample data structures:
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/containers/vector.hpp>

// filesystem stuff
#include <fcntl.h>
#include <filesystem>
#include <fstream>
#include <sys/stat.h>

// convenience:
static constexpr char const* FILENAME = "data.bin";
static auto operator""_MB(unsigned long long n) { return n << 20; }
#include <algorithm>
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/algorithm/sort.hpp>
#include <iostream>

namespace bip = boost::interprocess;
namespace fs  = std::filesystem;
using boost::adaptors::indirected;

using Segment                     = bip::managed_external_buffer;
template <typename T> using Alloc = bip::allocator<T, Segment::segment_manager>;
using String    = bip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using Database  = bip::vector<String, Alloc<String>>;
using StringPtr = bip::offset_ptr<String>;
using Index     = bip::vector<StringPtr, Alloc<StringPtr>>;

struct MySharedSegment {
    MySharedSegment(bip::create_only_t, char const* filename, size_t size) {
        if (fs::exists(filename))
            throw std::runtime_error("file already exists");
        {
            std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
        }

        grow(filename, size);
        fm = bip::file_mapping(filename, bip::mode_t::read_write);

        auto offset = write_magic_header();

        buf = bip::mapped_region(fm, bip::mode_t::read_write, offset,
                                 size - offset);
        mb  = Segment(bip::create_only, buf.get_address(), buf.get_size());
        auto* mgr = mb.get_segment_manager();

        _vec   = mb.find_or_construct<Database>("vec")(mgr);
        _index = mb.find_or_construct<Index>("index")(mgr);
    }

    MySharedSegment(bip::open_only_t, char const* filename)
    {
        if (!fs::exists(filename))
            throw std::runtime_error("file not found");
        fm = bip::file_mapping(filename, bip::mode_t::read_write);

        auto offset = check_magic_header();
        buf         = bip::mapped_region(fm, bip::mode_t::read_write, offset);

        mb = Segment(bip::open_only, buf.get_address(), buf.get_size());
        if (buf.get_size() > mb.get_size()) {
            // also grow segment if buffer grew
            mb.grow(buf.get_size() - mb.get_size());
        }

        auto [v, vok] = mb.find<Database>("vec");
        auto [i, iok] = mb.find<Index>("index");

        if (!(v && vok && i && iok)) {
            throw std::runtime_error("an expected object was not found");
        }

        _vec   = v;
        _index = i;
    }

    static void grow(char const* filename, size_t extra) {
        fs::resize_file(filename, std::filesystem::file_size(filename) + extra);
    }

    Database& database() {
        assert(_vec);
        return *_vec;
    }

    Index& index() {
        assert(_index);
        return *_index;
    }

    Segment::segment_manager* get_segment_manager() {
        return mb.get_segment_manager();
    }

  private:
    size_t write_magic_header() {
        auto HLEN = v1_magic_header.size();

        if (fs::file_size(fm.get_name()) < HLEN)
            throw std::runtime_error("File short");

        bip::mapped_region mr(fm, bip::mode_t::read_write, 0, HLEN);

        auto out = reinterpret_cast<uint8_t*>(mr.get_address());
        auto nxt =
            std::copy(v1_magic_header.begin(), v1_magic_header.end(), out);
        assert(size_t(nxt - out) == HLEN);
        return HLEN;
    }

    size_t check_magic_header() {
        auto HLEN = v1_magic_header.size();

        if (fs::file_size(fm.get_name()) >= HLEN) {
            bip::mapped_region mr(fm, bip::mode_t::read_only, 0, HLEN);

            if (std::equal(
                    v1_magic_header.begin(), v1_magic_header.end(),
                    reinterpret_cast<uint8_t const*>(mr.get_address()))) {
                return HLEN;
            }
        }
        // TODO future adds newer versions with different on disk formats
        throw std::runtime_error("Unknown database file format");
    }

    bip::file_mapping  fm;
    bip::mapped_region buf;
    Segment            mb;

    Database* _vec   = nullptr;
    Index*    _index = nullptr;

    static constexpr std::array<uint8_t, 16> v1_magic_header = {
        0x27, 0x65, 0xb6, 0xcb, 0x3a, 0x86, 0xf5, 0x48,
        0xba, 0xa3, 0x2c, 0x49, 0x00, 0xdd, 0x6f, 0xde,
    };
};

void create_initial(int size) {
    MySharedSegment mss(bip::create_only, FILENAME, size);

    auto* mgr = mss.get_segment_manager();
    auto& db  = mss.database();

    db.emplace_back("one", mgr);
    db.emplace_back("two", mgr);
    db.emplace_back("three", mgr);

    auto& index = mss.index();
    for (auto& elem : db) {
        index.emplace_back(&elem);
    }

    boost::sort(index | indirected);

    for (auto el : index | indirected) {
        std::cout << el << " ";
    }
    std::cout << "\n";
}

void offline_grow_with(int size) { MySharedSegment::grow(FILENAME, size); }

void reopen_and_verify() {
    MySharedSegment mss(bip::open_only, FILENAME);

    // none of the pointers in the index have become invalidated:
    for (auto el : mss.index() | indirected) {
        std::cout << el << " ";
    }
    std::cout << "\n";
}

int main()
{
    std::remove(FILENAME);

    create_initial(1_MB);

    offline_grow_with(1_MB);

    reopen_and_verify();
}

版画

one three two 
one three two 

备注

以上还有requires/assumes线下增长。您可以在控件 headers 中添加一个进程间共享互斥锁(在托管段缓冲区之外)并使用它来进行 reader-writer 锁定,以便其他方在请求增长时自动取消映射该段。