通过 Mmap-ed 共享内存传递可变长度的 C 字符串
Passing Variable Length C String Through Mmap-ed Shared Memory
假设我有一个进程A和一个进程B,进程A想通过shm_open() + mmap()共享内存将C字符串传递给进程B。
延迟效率最高的方法是什么?
这个的答案建议在C++11之后,std::atomic是通过共享内存共享数据的正确方法。
但是,我看不出如何编写类似这样的 C 字符串:
struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));
假设我有一个这样创建的共享内存:
class SHM {
char* _ptr;
public:
SHM() {
const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
const auto size = 4 * 1024 * 1024;
if (-1 == ftruncate(handle, size)) {
throw;
}
_ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
if(_ptr == MAP_FAILED){
throw;
}
int rc = fchmod(handle, 0666);
if (rc == -1) {
throw;
}
}
// assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
Buffer& getBuffer() noexcept {
return *reinrepret_cast<Buffer*>(_ptr);
}
Buffer& read() {
auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
while (buffer.size.load(std::memory_order_acquire) > 0) {
buffer.str.load(std::memory_order_relaxed);
return buffer;
}
}
};
SHM::getBuffer()
的调用者如何正确地逐个字符写入 Buffer::str 以便进程 B 可以调用 SHM::read()
进行检索?
buffer.str.load(std::memory_order_relaxed) 是否真正以原子方式正确加载?我对此表示怀疑,因为它甚至不知道长度。
这适用于 Linux、X86-64、GCC 7。
提前致谢。
这是单生产者单消费者案例的工作草图(producer/consumer 个线程是否来自同一进程并不重要),无等待:
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>
class SingleProducerSingleConsumerIndexes {
std::atomic<uint64_t> produced_ = {};
std::atomic<uint64_t> consumed_ = {};
public: // Producer interface.
uint64_t produced() {
auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
auto produced = produced_.load(std::memory_order_relaxed);
if(produced != consumed || !produced)
return produced;
// Entire buffer was consumed. Rewind.
produced_.store(0, std::memory_order_release); // Store 1.
consumed_.store(0, std::memory_order_relaxed); // Store 3.
return 0;
}
void produce(uint64_t end) {
produced_.store(end, std::memory_order_release); // Store 1.
}
public: // Consumer interface.
std::pair<uint64_t, uint64_t> available() const {
auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
auto consumed = consumed_.load(std::memory_order_relaxed);
// min handles the case of store 3 not visible yet.
return {std::min(produced, consumed), produced};
}
void consume(uint64_t end) {
consumed_.store(end, std::memory_order_release); // Store 2.
}
};
class SharedMemoryStrings {
void* p_;
static constexpr int size = 4 * 1024 * 1024;
static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
SharedMemoryStrings() {
auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
if(-1 == ::ftruncate(handle, size))
throw;
p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
::close(handle);
if(p_ == MAP_FAILED)
throw;
}
~SharedMemoryStrings() {
::munmap(p_, size);
}
void produce(std::string const& s) {
auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
auto produced = indexes->produced();
uint64_t new_end = produced + sizeof(uint64_t) + s.size();
if(new_end > buffer_size)
throw; // Out of buffer space.
auto* buffer = reinterpret_cast<char*>(indexes + 1) + produced;
uint64_t size = s.size();
memcpy(buffer, &size, sizeof size);
buffer += sizeof size;
memcpy(buffer, s.data(), s.size());
indexes->produce(new_end);
}
bool try_consume(std::string& s) {
auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
auto available = indexes->available();
auto consumed = available.first;
auto produced = available.second;
if(consumed == produced)
return false; // No data available.
auto* buffer = reinterpret_cast<char const*>(indexes + 1) + consumed;
uint64_t size;
memcpy(&size, buffer, sizeof size);
buffer += sizeof size;
// Reuse the string to minimize memory allocations.
s.assign(buffer, size);
indexes->consume(consumed + sizeof(uint64_t) + size);
return true;
}
};
int main(int ac, char** av) {
if(ac > 1) {
// Producer.
SharedMemoryStrings a;
for(int i = 1; i < ac; ++i)
a.produce(av[i]);
}
else {
// Consumer.
SharedMemoryStrings a;
for(std::string s;;) { // Busy-wait loop.
if(a.try_consume(s)) // Reuse the string to minimize memory allocations.
printf("%s\n", s.c_str());
// else // Potential optimization.
// _mm_pause();
}
}
}
备注:
像g++ -o test -W{all,extra,error} -std=gnu++11 -O3 -DNDEBUG -march=native -pthread -lrt test.cc
一样编译代码。假设此来源称为 test.cc
.
不带参数启动消费者,./test
。带有参数的生产者,例如 ./test hello world
。启动顺序无关紧要。
它是单一生产者单一消费者解决方案。它是 wait-free(生产者和消费者调用在固定数量的指令中完成,没有循环),这比 lock-free 要好(这不能保证完成固定数量的指令)。不能再快了。
在 x86-64 上,这些获取和释放原子加载和存储编译成简单的 mov
指令,因为当前的 x86-64 内存模型有点太强大了。但是,使用 std::atomic
和特定的内存顺序可确保编译器不会对指令重新排序。它还确保代码在内存模型较弱的体系结构上编译和工作正常,并在必要时插入适当的屏障,这是 volatile
不可能做到的。例如,像 PowerPC。使用 volatile
与使用 std::memory_order_relaxed
相同。见 the assembly comparison.
produced_.store(end, std::memory_order_release);
确保一旦此存储的效果可见,生产者线程所做的所有先前存储(memcpy
进入共享内存)对消费者线程可见通过 produced_.load(std::memory_order_acquire);
。见 http://preshing.com/20130823/the-synchronizes-with-relation/ for thorough treatment of the subject. Also std::memory_order
说得最好:
memory_order_acquire
A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load. All writes in other threads that release the same atomic variable are visible in the current thread.
memory_order_release
A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store. All writes in the current thread are visible in other threads that acquire the same atomic variable and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic.
生产者检测消费者何时消费完所有可用数据。在这种情况下,生产者将缓冲区倒回开头。这样做是为了避免处理环形缓冲区的缓冲区包装。如果消费者不能足够快地处理消息,无论如何缓冲区最终都会变满。
它从不调用 SingleProducerSingleConsumerIndexes
构造函数。它依赖于一个新文件是零初始化的事实,而这正是构造函数要做的。在更复杂的场景中,如果文件刚刚创建,则需要调用共享数据的构造函数。这可以通过首先创建一个具有唯一名称的临时文件(如果该文件尚不存在),将文件映射到内存并调用构造函数来完成。然后将该临时文件重命名为最终名称(rename
是原子的)。如果重命名失败是因为文件已经存在,请删除临时文件并重新开始。
消费者忙等待尽可能低的延迟。如果您希望消费者在等待时阻塞,可以添加进程共享互斥锁和条件变量来实现这一点。不过,唤醒内核中等待条件变量(Linux 中的 futex)的线程需要几微秒。这将需要调用 SingleProducerSingleConsumerIndexes
构造函数来完成所有必需的初始化(例如,初始化一个健壮的自适应进程共享互斥锁和一个进程共享条件变量)。
假设我有一个进程A和一个进程B,进程A想通过shm_open() + mmap()共享内存将C字符串传递给进程B。
延迟效率最高的方法是什么?
这个
但是,我看不出如何编写类似这样的 C 字符串:
struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));
假设我有一个这样创建的共享内存:
class SHM {
char* _ptr;
public:
SHM() {
const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
const auto size = 4 * 1024 * 1024;
if (-1 == ftruncate(handle, size)) {
throw;
}
_ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
if(_ptr == MAP_FAILED){
throw;
}
int rc = fchmod(handle, 0666);
if (rc == -1) {
throw;
}
}
// assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
Buffer& getBuffer() noexcept {
return *reinrepret_cast<Buffer*>(_ptr);
}
Buffer& read() {
auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
while (buffer.size.load(std::memory_order_acquire) > 0) {
buffer.str.load(std::memory_order_relaxed);
return buffer;
}
}
};
SHM::getBuffer()
的调用者如何正确地逐个字符写入 Buffer::str 以便进程 B 可以调用 SHM::read()
进行检索?
buffer.str.load(std::memory_order_relaxed) 是否真正以原子方式正确加载?我对此表示怀疑,因为它甚至不知道长度。
这适用于 Linux、X86-64、GCC 7。
提前致谢。
这是单生产者单消费者案例的工作草图(producer/consumer 个线程是否来自同一进程并不重要),无等待:
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>
class SingleProducerSingleConsumerIndexes {
std::atomic<uint64_t> produced_ = {};
std::atomic<uint64_t> consumed_ = {};
public: // Producer interface.
uint64_t produced() {
auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
auto produced = produced_.load(std::memory_order_relaxed);
if(produced != consumed || !produced)
return produced;
// Entire buffer was consumed. Rewind.
produced_.store(0, std::memory_order_release); // Store 1.
consumed_.store(0, std::memory_order_relaxed); // Store 3.
return 0;
}
void produce(uint64_t end) {
produced_.store(end, std::memory_order_release); // Store 1.
}
public: // Consumer interface.
std::pair<uint64_t, uint64_t> available() const {
auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
auto consumed = consumed_.load(std::memory_order_relaxed);
// min handles the case of store 3 not visible yet.
return {std::min(produced, consumed), produced};
}
void consume(uint64_t end) {
consumed_.store(end, std::memory_order_release); // Store 2.
}
};
class SharedMemoryStrings {
void* p_;
static constexpr int size = 4 * 1024 * 1024;
static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
SharedMemoryStrings() {
auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
if(-1 == ::ftruncate(handle, size))
throw;
p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
::close(handle);
if(p_ == MAP_FAILED)
throw;
}
~SharedMemoryStrings() {
::munmap(p_, size);
}
void produce(std::string const& s) {
auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
auto produced = indexes->produced();
uint64_t new_end = produced + sizeof(uint64_t) + s.size();
if(new_end > buffer_size)
throw; // Out of buffer space.
auto* buffer = reinterpret_cast<char*>(indexes + 1) + produced;
uint64_t size = s.size();
memcpy(buffer, &size, sizeof size);
buffer += sizeof size;
memcpy(buffer, s.data(), s.size());
indexes->produce(new_end);
}
bool try_consume(std::string& s) {
auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
auto available = indexes->available();
auto consumed = available.first;
auto produced = available.second;
if(consumed == produced)
return false; // No data available.
auto* buffer = reinterpret_cast<char const*>(indexes + 1) + consumed;
uint64_t size;
memcpy(&size, buffer, sizeof size);
buffer += sizeof size;
// Reuse the string to minimize memory allocations.
s.assign(buffer, size);
indexes->consume(consumed + sizeof(uint64_t) + size);
return true;
}
};
int main(int ac, char** av) {
if(ac > 1) {
// Producer.
SharedMemoryStrings a;
for(int i = 1; i < ac; ++i)
a.produce(av[i]);
}
else {
// Consumer.
SharedMemoryStrings a;
for(std::string s;;) { // Busy-wait loop.
if(a.try_consume(s)) // Reuse the string to minimize memory allocations.
printf("%s\n", s.c_str());
// else // Potential optimization.
// _mm_pause();
}
}
}
备注:
像
g++ -o test -W{all,extra,error} -std=gnu++11 -O3 -DNDEBUG -march=native -pthread -lrt test.cc
一样编译代码。假设此来源称为test.cc
.不带参数启动消费者,
./test
。带有参数的生产者,例如./test hello world
。启动顺序无关紧要。它是单一生产者单一消费者解决方案。它是 wait-free(生产者和消费者调用在固定数量的指令中完成,没有循环),这比 lock-free 要好(这不能保证完成固定数量的指令)。不能再快了。
在 x86-64 上,这些获取和释放原子加载和存储编译成简单的
mov
指令,因为当前的 x86-64 内存模型有点太强大了。但是,使用std::atomic
和特定的内存顺序可确保编译器不会对指令重新排序。它还确保代码在内存模型较弱的体系结构上编译和工作正常,并在必要时插入适当的屏障,这是volatile
不可能做到的。例如,像 PowerPC。使用volatile
与使用std::memory_order_relaxed
相同。见 the assembly comparison.produced_.store(end, std::memory_order_release);
确保一旦此存储的效果可见,生产者线程所做的所有先前存储(memcpy
进入共享内存)对消费者线程可见通过produced_.load(std::memory_order_acquire);
。见 http://preshing.com/20130823/the-synchronizes-with-relation/ for thorough treatment of the subject. Alsostd::memory_order
说得最好:memory_order_acquire
A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load. All writes in other threads that release the same atomic variable are visible in the current thread.memory_order_release
A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store. All writes in the current thread are visible in other threads that acquire the same atomic variable and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic.生产者检测消费者何时消费完所有可用数据。在这种情况下,生产者将缓冲区倒回开头。这样做是为了避免处理环形缓冲区的缓冲区包装。如果消费者不能足够快地处理消息,无论如何缓冲区最终都会变满。
它从不调用
SingleProducerSingleConsumerIndexes
构造函数。它依赖于一个新文件是零初始化的事实,而这正是构造函数要做的。在更复杂的场景中,如果文件刚刚创建,则需要调用共享数据的构造函数。这可以通过首先创建一个具有唯一名称的临时文件(如果该文件尚不存在),将文件映射到内存并调用构造函数来完成。然后将该临时文件重命名为最终名称(rename
是原子的)。如果重命名失败是因为文件已经存在,请删除临时文件并重新开始。消费者忙等待尽可能低的延迟。如果您希望消费者在等待时阻塞,可以添加进程共享互斥锁和条件变量来实现这一点。不过,唤醒内核中等待条件变量(Linux 中的 futex)的线程需要几微秒。这将需要调用
SingleProducerSingleConsumerIndexes
构造函数来完成所有必需的初始化(例如,初始化一个健壮的自适应进程共享互斥锁和一个进程共享条件变量)。