创建一个在 boost 中使用多个 managed_shared_memory 段的分配器
Creating an allocator that uses multiple managed_shared_memory segments in boost
为了解决 managed_shared_memory segment without unmapping and remapping all the previous regions, I want to create an allocator that creates a new managed_shared_memory segment whenever there is not enough space in the previous segments. I have looked into the boost interprocess node allocators 的增长问题,但它们似乎不太适合解决这个问题。 boost 中是否有任何 class 或实用程序可以帮助解决此问题?
不是问题的直接匹配,而是评论线程的相关成果,这里我举一个例子,使用 managed_external_buffer 来实现对 on-disk 格式的更多控制(预见向后兼容的版本控制也许还有一些完整性验证)并展示了如何实现增长
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/managed_external_buffer.hpp>
#include <boost/interprocess/mapped_region.hpp>
// sample data structures:
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/containers/vector.hpp>
// filesystem stuff
#include <fcntl.h>
#include <filesystem>
#include <fstream>
#include <sys/stat.h>
// convenience:
static constexpr char const* FILENAME = "data.bin";
static auto operator""_MB(unsigned long long n) { return n << 20; }
#include <algorithm>
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/algorithm/sort.hpp>
#include <iostream>
namespace bip = boost::interprocess;
namespace fs = std::filesystem;
using boost::adaptors::indirected;
using Segment = bip::managed_external_buffer;
template <typename T> using Alloc = bip::allocator<T, Segment::segment_manager>;
using String = bip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using Database = bip::vector<String, Alloc<String>>;
using StringPtr = bip::offset_ptr<String>;
using Index = bip::vector<StringPtr, Alloc<StringPtr>>;
struct MySharedSegment {
MySharedSegment(bip::create_only_t, char const* filename, size_t size) {
if (fs::exists(filename))
throw std::runtime_error("file already exists");
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
}
grow(filename, size);
fm = bip::file_mapping(filename, bip::mode_t::read_write);
auto offset = write_magic_header();
buf = bip::mapped_region(fm, bip::mode_t::read_write, offset,
size - offset);
mb = Segment(bip::create_only, buf.get_address(), buf.get_size());
auto* mgr = mb.get_segment_manager();
_vec = mb.find_or_construct<Database>("vec")(mgr);
_index = mb.find_or_construct<Index>("index")(mgr);
}
MySharedSegment(bip::open_only_t, char const* filename)
{
if (!fs::exists(filename))
throw std::runtime_error("file not found");
fm = bip::file_mapping(filename, bip::mode_t::read_write);
auto offset = check_magic_header();
buf = bip::mapped_region(fm, bip::mode_t::read_write, offset);
mb = Segment(bip::open_only, buf.get_address(), buf.get_size());
if (buf.get_size() > mb.get_size()) {
// also grow segment if buffer grew
mb.grow(buf.get_size() - mb.get_size());
}
auto [v, vok] = mb.find<Database>("vec");
auto [i, iok] = mb.find<Index>("index");
if (!(v && vok && i && iok)) {
throw std::runtime_error("an expected object was not found");
}
_vec = v;
_index = i;
}
static void grow(char const* filename, size_t extra) {
fs::resize_file(filename, std::filesystem::file_size(filename) + extra);
}
Database& database() {
assert(_vec);
return *_vec;
}
Index& index() {
assert(_index);
return *_index;
}
Segment::segment_manager* get_segment_manager() {
return mb.get_segment_manager();
}
private:
size_t write_magic_header() {
auto HLEN = v1_magic_header.size();
if (fs::file_size(fm.get_name()) < HLEN)
throw std::runtime_error("File short");
bip::mapped_region mr(fm, bip::mode_t::read_write, 0, HLEN);
auto out = reinterpret_cast<uint8_t*>(mr.get_address());
auto nxt =
std::copy(v1_magic_header.begin(), v1_magic_header.end(), out);
assert(size_t(nxt - out) == HLEN);
return HLEN;
}
size_t check_magic_header() {
auto HLEN = v1_magic_header.size();
if (fs::file_size(fm.get_name()) >= HLEN) {
bip::mapped_region mr(fm, bip::mode_t::read_only, 0, HLEN);
if (std::equal(
v1_magic_header.begin(), v1_magic_header.end(),
reinterpret_cast<uint8_t const*>(mr.get_address()))) {
return HLEN;
}
}
// TODO future adds newer versions with different on disk formats
throw std::runtime_error("Unknown database file format");
}
bip::file_mapping fm;
bip::mapped_region buf;
Segment mb;
Database* _vec = nullptr;
Index* _index = nullptr;
static constexpr std::array<uint8_t, 16> v1_magic_header = {
0x27, 0x65, 0xb6, 0xcb, 0x3a, 0x86, 0xf5, 0x48,
0xba, 0xa3, 0x2c, 0x49, 0x00, 0xdd, 0x6f, 0xde,
};
};
void create_initial(int size) {
MySharedSegment mss(bip::create_only, FILENAME, size);
auto* mgr = mss.get_segment_manager();
auto& db = mss.database();
db.emplace_back("one", mgr);
db.emplace_back("two", mgr);
db.emplace_back("three", mgr);
auto& index = mss.index();
for (auto& elem : db) {
index.emplace_back(&elem);
}
boost::sort(index | indirected);
for (auto el : index | indirected) {
std::cout << el << " ";
}
std::cout << "\n";
}
void offline_grow_with(int size) { MySharedSegment::grow(FILENAME, size); }
void reopen_and_verify() {
MySharedSegment mss(bip::open_only, FILENAME);
// none of the pointers in the index have become invalidated:
for (auto el : mss.index() | indirected) {
std::cout << el << " ";
}
std::cout << "\n";
}
int main()
{
std::remove(FILENAME);
create_initial(1_MB);
offline_grow_with(1_MB);
reopen_and_verify();
}
版画
one three two
one three two
备注
以上还有requires/assumes线下增长。您可以在控件 headers 中添加一个进程间共享互斥锁(在托管段缓冲区之外)并使用它来进行 reader-writer 锁定,以便其他方在请求增长时自动取消映射该段。
为了解决 managed_shared_memory segment without unmapping and remapping all the previous regions, I want to create an allocator that creates a new managed_shared_memory segment whenever there is not enough space in the previous segments. I have looked into the boost interprocess node allocators 的增长问题,但它们似乎不太适合解决这个问题。 boost 中是否有任何 class 或实用程序可以帮助解决此问题?
不是问题的直接匹配,而是评论线程的相关成果,这里我举一个例子,使用 managed_external_buffer 来实现对 on-disk 格式的更多控制(预见向后兼容的版本控制也许还有一些完整性验证)并展示了如何实现增长
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/managed_external_buffer.hpp>
#include <boost/interprocess/mapped_region.hpp>
// sample data structures:
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/containers/vector.hpp>
// filesystem stuff
#include <fcntl.h>
#include <filesystem>
#include <fstream>
#include <sys/stat.h>
// convenience:
static constexpr char const* FILENAME = "data.bin";
static auto operator""_MB(unsigned long long n) { return n << 20; }
#include <algorithm>
#include <boost/range/adaptor/indirected.hpp>
#include <boost/range/algorithm/sort.hpp>
#include <iostream>
namespace bip = boost::interprocess;
namespace fs = std::filesystem;
using boost::adaptors::indirected;
using Segment = bip::managed_external_buffer;
template <typename T> using Alloc = bip::allocator<T, Segment::segment_manager>;
using String = bip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using Database = bip::vector<String, Alloc<String>>;
using StringPtr = bip::offset_ptr<String>;
using Index = bip::vector<StringPtr, Alloc<StringPtr>>;
struct MySharedSegment {
MySharedSegment(bip::create_only_t, char const* filename, size_t size) {
if (fs::exists(filename))
throw std::runtime_error("file already exists");
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
}
grow(filename, size);
fm = bip::file_mapping(filename, bip::mode_t::read_write);
auto offset = write_magic_header();
buf = bip::mapped_region(fm, bip::mode_t::read_write, offset,
size - offset);
mb = Segment(bip::create_only, buf.get_address(), buf.get_size());
auto* mgr = mb.get_segment_manager();
_vec = mb.find_or_construct<Database>("vec")(mgr);
_index = mb.find_or_construct<Index>("index")(mgr);
}
MySharedSegment(bip::open_only_t, char const* filename)
{
if (!fs::exists(filename))
throw std::runtime_error("file not found");
fm = bip::file_mapping(filename, bip::mode_t::read_write);
auto offset = check_magic_header();
buf = bip::mapped_region(fm, bip::mode_t::read_write, offset);
mb = Segment(bip::open_only, buf.get_address(), buf.get_size());
if (buf.get_size() > mb.get_size()) {
// also grow segment if buffer grew
mb.grow(buf.get_size() - mb.get_size());
}
auto [v, vok] = mb.find<Database>("vec");
auto [i, iok] = mb.find<Index>("index");
if (!(v && vok && i && iok)) {
throw std::runtime_error("an expected object was not found");
}
_vec = v;
_index = i;
}
static void grow(char const* filename, size_t extra) {
fs::resize_file(filename, std::filesystem::file_size(filename) + extra);
}
Database& database() {
assert(_vec);
return *_vec;
}
Index& index() {
assert(_index);
return *_index;
}
Segment::segment_manager* get_segment_manager() {
return mb.get_segment_manager();
}
private:
size_t write_magic_header() {
auto HLEN = v1_magic_header.size();
if (fs::file_size(fm.get_name()) < HLEN)
throw std::runtime_error("File short");
bip::mapped_region mr(fm, bip::mode_t::read_write, 0, HLEN);
auto out = reinterpret_cast<uint8_t*>(mr.get_address());
auto nxt =
std::copy(v1_magic_header.begin(), v1_magic_header.end(), out);
assert(size_t(nxt - out) == HLEN);
return HLEN;
}
size_t check_magic_header() {
auto HLEN = v1_magic_header.size();
if (fs::file_size(fm.get_name()) >= HLEN) {
bip::mapped_region mr(fm, bip::mode_t::read_only, 0, HLEN);
if (std::equal(
v1_magic_header.begin(), v1_magic_header.end(),
reinterpret_cast<uint8_t const*>(mr.get_address()))) {
return HLEN;
}
}
// TODO future adds newer versions with different on disk formats
throw std::runtime_error("Unknown database file format");
}
bip::file_mapping fm;
bip::mapped_region buf;
Segment mb;
Database* _vec = nullptr;
Index* _index = nullptr;
static constexpr std::array<uint8_t, 16> v1_magic_header = {
0x27, 0x65, 0xb6, 0xcb, 0x3a, 0x86, 0xf5, 0x48,
0xba, 0xa3, 0x2c, 0x49, 0x00, 0xdd, 0x6f, 0xde,
};
};
void create_initial(int size) {
MySharedSegment mss(bip::create_only, FILENAME, size);
auto* mgr = mss.get_segment_manager();
auto& db = mss.database();
db.emplace_back("one", mgr);
db.emplace_back("two", mgr);
db.emplace_back("three", mgr);
auto& index = mss.index();
for (auto& elem : db) {
index.emplace_back(&elem);
}
boost::sort(index | indirected);
for (auto el : index | indirected) {
std::cout << el << " ";
}
std::cout << "\n";
}
void offline_grow_with(int size) { MySharedSegment::grow(FILENAME, size); }
void reopen_and_verify() {
MySharedSegment mss(bip::open_only, FILENAME);
// none of the pointers in the index have become invalidated:
for (auto el : mss.index() | indirected) {
std::cout << el << " ";
}
std::cout << "\n";
}
int main()
{
std::remove(FILENAME);
create_initial(1_MB);
offline_grow_with(1_MB);
reopen_and_verify();
}
版画
one three two
one three two
备注
以上还有requires/assumes线下增长。您可以在控件 headers 中添加一个进程间共享互斥锁(在托管段缓冲区之外)并使用它来进行 reader-writer 锁定,以便其他方在请求增长时自动取消映射该段。