提升共享内存:char * 的向量
Boost shared memory : vector of char *
我尝试在 boost 容器中保存一个 char* 但失败了。我的 char* 是 2048 个内存块上的二进制数据。这些二进制数据是用ALSA录制的声音。
但是当我将它保存在共享内存字符串的向量中时,它会以某种方式发生变异,我不知道如何修复它。
编辑并大概回答:
ALSA 发送 void* 缓冲区,所以如果我可以创建一个 void* 的共享内存向量,我就可以做到这一点。所以我基本上需要创建一个 void* 向量,每个 void* 的大小必须固定(在本例中:2048)。我认为 boost::interprocess::basic_string
是问题所在
结束编辑
这里是完整的解释:
我正在尝试通过一个程序从 ALSA 的直接声音输入中收听,然后使用另一个程序将其写入文件(或处理其中的任何内容)
我从这个问题开始:Create a shared-memory vector of strings
现在我卡住了,我不太了解提升。
我创建了一个包含完整项目的 github (https://github.com/Waxo/nodetest)。 Alsa control with the method listen with callback 只需调用一个带有 (char* , int) 原型的方法。
构建项目后,当 ./nodetest
说 "Go"
时,您可以启动 ./nodetest
和 ./nodetest arg
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>
#include <iostream>
#include <alsa_control.h>
using std::cout;
using std::endl;
typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;
class lambda_class {
public:
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc;
this->output_file_.write(c, rc * 2);
}
lambda_class(std::string filename) {
this->filename_ = filename;
this->sample_count_ = 0;
this->output_file_.open(this->filename_, std::ios::binary);
write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
}
~lambda_class() {
this->output_file_.close();
this->output_file_.open(this->filename_,
std::ios::binary | std::ios::in);
write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
}
private:
std::string filename_;
int sample_count_;
std::ofstream output_file_;
lambda_class(const lambda_class &a) = delete;
};
class input_class {
public:
input_class() {
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
1000000);
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
};
~input_class() {
lambda_class *lc = new lambda_class("listener_vector.wav");
char *c = (char *) malloc(2048);
for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
strcpy(c, it->c_str());
lc->lambda_callback(c, 2048);
}
delete lc;
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm->destroy_ptr(this->myshmvector);
}
void to_node(char *c, int rc) {
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
MyShmString mystring(charallocator);
mystring = c;
this->myshmvector->insert(this->myshmvector->begin(), mystring);
}
private:
boost::interprocess::managed_shared_memory *shm;
MyShmStringVector *myshmvector;
};
void listener() {
lambda_class *ctc = new lambda_class("writer.wav");
char *c = (char *) malloc(2048);
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
strcpy(c, std::string(it->begin(), it->end()).c_str());
ctc->lambda_callback(c, 2048);
}
delete ctc;
return;
}
int main(int argc, char **argv) {
alsa_control *ac = new alsa_control(16000, 2048, 16, MONO);
if (argc == 1) {
input_class *ic = new input_class();
ac->listen_with_callback(std::bind(&input_class::to_node, ic, std::placeholders::_1, std::placeholders::_2),
"listener");
sleep(5);
ac->stop();
cout << "Go" << endl;
sleep(10);
delete ic;
delete ac;
} else {
auto th = std::async(std::launch::async, listener);
th.get();
}
return 0;
}
我只是想在多个进程中使用我的声音,我可以使用和共享它的结构(我将为我的所有程序创建和组织)。
char*可以固定大小,能用就好了
编辑:
我的问题是录制的声音 :
- listener_vector.wav 来自同一进程
- writer.wav 来自其他程序
无效,我认为 MyShmString 将它们变异为无效的二进制数据。
问题不清楚,建议改进问题。
尽管如此,我在您的代码中看到一个错误:
您分配了包含 2048 个元素的数组
char *c = (char *) malloc(2048);
然后你在 lambda_callback()
中写两倍
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc;
this->output_file_.write(c, rc * 2);
}
您需要将 rc 而不是 rc * 2 传递给 write();
我没有检查你的代码墙,但在你的问题中我注意到你说:
I try to save an char* inside boost container
然后
My char* is binary data
然后
I think that the boost::interprocess::basic_string si the problem
现在,如果您指定了确切的类型而不是只说 "boost container" - 哪个容器,那将会有所帮助,但是如果您试图 将二进制数据存储在string class - NOOOOOOO,这不是你使用字符串的方式。您将文本存储在字符串中,而不是二进制数据。对于二进制数据,使用 std::vector<char>
.
特别是,如果您在代码中的任何地方使用类似的东西
string s = (char*) binary_data;
字符串 将在第一次遇到零时被截断 !
我已经修改了您的示例代码以使用 int8_t
的进程间 vector
,这可以看作是正确存储二进制数据的方法之一。
当然输出文件是错误的,因为这一行:
this->output_file_.write(c, rc * 2);
如果你想写两倍的数据,你需要调用 write
两次:
this->output_file_.write(c, rc);
this->output_file_.write(c, rc);
传递 rc * 2
使其在前 2048 字节后写入垃圾数据。
另一个潜在的问题是这一行:
this->myshmvector->insert(this->myshmvector->begin(), mystring);
在这里,您以相反的顺序存储数据。那是你的意图吗?在我的固定示例中,我将其更改为按顺序存储数据。
第三个潜在问题可能是@sashoalm 提到的二进制数据中的零。
这里是我修改的固定部分代码:
typedef boost::interprocess::allocator<int8_t, boost::interprocess::managed_shared_memory::segment_manager> ShmemAllocator;
typedef boost::interprocess::vector<int8_t, ShmemAllocator> MyVector;
class lambda_class {
public:
void lambda_callback(int8_t *c, int rc) {
this->sample_count_ += rc;
//this->output_file_.write(c, rc * 2); // this is not going to work, to write twice as much data you need to call write() twice
this->output_file_.write(reinterpret_cast<char*>(c), rc);
//this->output_file_.write(reinterpret_cast<char*>(c), rc); // uncomment this line to write twice as much data
}
lambda_class(std::string filename) {
this->filename_ = filename;
this->sample_count_ = 0;
this->output_file_.open(this->filename_, std::ios::binary);
write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
}
~lambda_class() {
this->output_file_.close();
this->output_file_.open(this->filename_,
std::ios::binary | std::ios::in);
write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
}
private:
std::string filename_;
int sample_count_;
std::ofstream output_file_;
lambda_class(const lambda_class &a) = delete;
};
class input_class {
public:
input_class() {
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
1000000);
const ShmemAllocator alloc_inst(this->shm->get_segment_manager());
this->myshmvector = shm->construct<MyVector>("myshmvector")(alloc_inst);
};
~input_class() {
lambda_class *lc = new lambda_class("listener_vector.wav");
int8_t *c = (int8_t *)malloc(2048);
for (MyVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it+=2048) {
memcpy(c, &*it, 2048);
lc->lambda_callback(c, 2048);
}
delete lc;
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm->destroy_ptr(this->myshmvector);
}
void to_node(int8_t *c, int rc) {
//this->myshmvector->insert(this->myshmvector->begin(), c, c+2048); // if your intention was to reverse the data then uncomment this line and comment out the line below
this->myshmvector->insert(this->myshmvector->end(), c, c + 2048); // storing data in order as it comes
}
private:
boost::interprocess::managed_shared_memory *shm;
MyVector *myshmvector;
};
void listener() {
lambda_class *ctc = new lambda_class("writer.wav");
int8_t *c = (int8_t *)malloc(2048);
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyVector *myvector = segment.find<MyVector>("myshmvector").first;
for (MyVector::iterator it = myvector->begin(); it != myvector->end(); it+=2048) {
memcpy(c, &*it, 2048);
ctc->lambda_callback(c, 2048);
}
delete ctc;
return;
}
除了其他人所说的,
您在样本 -> 字节数转换中出错。这导致一半的输出是未初始化的数据
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc;
this->output_file_.write(c, rc * 2);
}
变成
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc/2;
this->output_file_.write(c, rc);
}
NOTE see also rc*2
in to_node
below
您将字符串插入向量前面 - 这会导致音频块以相反的顺序输出。此外,对 mystring
的分配忽略了 rc
信息
void to_node(char *c, int rc) {
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
MyShmString mystring(charallocator);
mystring = c; // OOPS! this cuts off at any NUL char
this->myshmvector->insert(this->myshmvector->begin(), mystring);
}
变成
void to_node(char *c, int rc) {
_myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
}
在迭代共享向量时,您实际上应该使用字符串大小,而不是依赖于固定的缓冲区大小。此外 不要对二进制数据使用 strcpy
:
void listener() {
lambda_class ctc("writer.wav");
char *c = (char *) malloc(2048);
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
//strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
strcpy(c, it->c_str());
ctc.lambda_callback(c, 2048);
}
}
变成:
void listener() {
lambda_class ctc("writer.wav");
char c[4096];
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
//strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
assert(it->size()<=sizeof(c));
memcpy(c, it->c_str(), it->size());
ctc.lambda_callback(c, it->size());
}
}
不要在 c++ 中 malloc
。你泄露了那些
你的测试场景很活泼(但我假设你知道这一点,这是为了简单起见?)
而不是 "magic" 文件重新打开,只是 seekp
:
this->output_file_.close();
this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
变成
output_file_.seekp(0ul);
拉取请求 1
commit 660208ff3fe792112ccae70a61e8a2442a853664
Author: Seth Heeren <sgheeren@gmail.com>
Date: Mon Aug 17 00:13:31 2015 +0200
Fixes, now
./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav
results in proper identical files, e.g.
$ ./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav
[1] 12350
Go
./nodetest
ff083a6344d7037da9c4f6d730239f30 listener_vector.wav
ff083a6344d7037da9c4f6d730239f30 listener.wav
ff083a6344d7037da9c4f6d730239f30 writer.wav
commit 04a5dec3da322dab196bfe568f52db6d9ed68b43
Author: Seth Heeren <sgheeren@gmail.com>
Date: Sun Aug 16 23:30:40 2015 +0200
we have repro
main.cc
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>
#include <iostream>
#include <alsa_control.h>
using std::cout;
using std::endl;
typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;
class lambda_class {
public:
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc/2;
this->output_file_.write(c, rc);
}
lambda_class(std::string filename) {
this->filename_ = filename;
this->sample_count_ = 0;
this->output_file_.open(this->filename_, std::ios::binary);
write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
}
~lambda_class() {
/*
*this->output_file_.close();
*this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
*/
output_file_.seekp(0ul);
write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
}
private:
std::string filename_;
int sample_count_;
std::ofstream output_file_;
lambda_class(const lambda_class &a) = delete;
};
class input_class {
public:
input_class() {
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
1000000);
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
};
~input_class() {
lambda_class lc("listener_vector.wav");
char c[4096];
for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
assert(it->size()<=sizeof(c));
memcpy(c, it->c_str(), it->size());
lc.lambda_callback(c, it->size());
}
boost::interprocess::shared_memory_object::remove("MySharedMemory");
shm->destroy_ptr(this->myshmvector);
}
void to_node(char *c, int rc) {
this->myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
}
private:
boost::interprocess::managed_shared_memory *shm;
MyShmStringVector *myshmvector;
};
void listener() {
lambda_class ctc("writer.wav");
char c[4096];
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
//strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
assert(it->size()<=sizeof(c));
memcpy(c, it->c_str(), it->size());
ctc.lambda_callback(c, it->size());
}
}
int main(int argc, char **argv) {
alsa_control ac(16000, 2048, 16, MONO);
if (argc == 1) {
input_class ic;
ac.listen_with_callback(std::bind(&input_class::to_node, &ic, std::placeholders::_1, std::placeholders::_2), "listener");
sleep(5);
ac.stop();
cout << "Go" << endl;
sleep(10);
} else {
// std::atomic<bool> done(false);
auto th = std::async(std::launch::async, listener);
// done.store(true, std::memory_order_relaxed);
th.get();
}
return 0;
}
我尝试在 boost 容器中保存一个 char* 但失败了。我的 char* 是 2048 个内存块上的二进制数据。这些二进制数据是用ALSA录制的声音。
但是当我将它保存在共享内存字符串的向量中时,它会以某种方式发生变异,我不知道如何修复它。
编辑并大概回答:
ALSA 发送 void* 缓冲区,所以如果我可以创建一个 void* 的共享内存向量,我就可以做到这一点。所以我基本上需要创建一个 void* 向量,每个 void* 的大小必须固定(在本例中:2048)。我认为 boost::interprocess::basic_string
是问题所在
结束编辑
这里是完整的解释:
我正在尝试通过一个程序从 ALSA 的直接声音输入中收听,然后使用另一个程序将其写入文件(或处理其中的任何内容)
我从这个问题开始:Create a shared-memory vector of strings
现在我卡住了,我不太了解提升。 我创建了一个包含完整项目的 github (https://github.com/Waxo/nodetest)。 Alsa control with the method listen with callback 只需调用一个带有 (char* , int) 原型的方法。
构建项目后,当 ./nodetest
说 "Go"
./nodetest
和 ./nodetest arg
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>
#include <iostream>
#include <alsa_control.h>
using std::cout;
using std::endl;
typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;
class lambda_class {
public:
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc;
this->output_file_.write(c, rc * 2);
}
lambda_class(std::string filename) {
this->filename_ = filename;
this->sample_count_ = 0;
this->output_file_.open(this->filename_, std::ios::binary);
write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
}
~lambda_class() {
this->output_file_.close();
this->output_file_.open(this->filename_,
std::ios::binary | std::ios::in);
write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
}
private:
std::string filename_;
int sample_count_;
std::ofstream output_file_;
lambda_class(const lambda_class &a) = delete;
};
class input_class {
public:
input_class() {
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
1000000);
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
};
~input_class() {
lambda_class *lc = new lambda_class("listener_vector.wav");
char *c = (char *) malloc(2048);
for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
strcpy(c, it->c_str());
lc->lambda_callback(c, 2048);
}
delete lc;
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm->destroy_ptr(this->myshmvector);
}
void to_node(char *c, int rc) {
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
MyShmString mystring(charallocator);
mystring = c;
this->myshmvector->insert(this->myshmvector->begin(), mystring);
}
private:
boost::interprocess::managed_shared_memory *shm;
MyShmStringVector *myshmvector;
};
void listener() {
lambda_class *ctc = new lambda_class("writer.wav");
char *c = (char *) malloc(2048);
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
strcpy(c, std::string(it->begin(), it->end()).c_str());
ctc->lambda_callback(c, 2048);
}
delete ctc;
return;
}
int main(int argc, char **argv) {
alsa_control *ac = new alsa_control(16000, 2048, 16, MONO);
if (argc == 1) {
input_class *ic = new input_class();
ac->listen_with_callback(std::bind(&input_class::to_node, ic, std::placeholders::_1, std::placeholders::_2),
"listener");
sleep(5);
ac->stop();
cout << "Go" << endl;
sleep(10);
delete ic;
delete ac;
} else {
auto th = std::async(std::launch::async, listener);
th.get();
}
return 0;
}
我只是想在多个进程中使用我的声音,我可以使用和共享它的结构(我将为我的所有程序创建和组织)。 char*可以固定大小,能用就好了
编辑:
我的问题是录制的声音 :
- listener_vector.wav 来自同一进程
- writer.wav 来自其他程序
无效,我认为 MyShmString 将它们变异为无效的二进制数据。
问题不清楚,建议改进问题。
尽管如此,我在您的代码中看到一个错误: 您分配了包含 2048 个元素的数组
char *c = (char *) malloc(2048);
然后你在 lambda_callback()
中写两倍 void lambda_callback(char *c, int rc) {
this->sample_count_ += rc;
this->output_file_.write(c, rc * 2);
}
您需要将 rc 而不是 rc * 2 传递给 write();
我没有检查你的代码墙,但在你的问题中我注意到你说:
I try to save an char* inside boost container
然后
My char* is binary data
然后
I think that the boost::interprocess::basic_string si the problem
现在,如果您指定了确切的类型而不是只说 "boost container" - 哪个容器,那将会有所帮助,但是如果您试图 将二进制数据存储在string class - NOOOOOOO,这不是你使用字符串的方式。您将文本存储在字符串中,而不是二进制数据。对于二进制数据,使用 std::vector<char>
.
特别是,如果您在代码中的任何地方使用类似的东西
string s = (char*) binary_data;
字符串 将在第一次遇到零时被截断 !
我已经修改了您的示例代码以使用 int8_t
的进程间 vector
,这可以看作是正确存储二进制数据的方法之一。
当然输出文件是错误的,因为这一行:
this->output_file_.write(c, rc * 2);
如果你想写两倍的数据,你需要调用 write
两次:
this->output_file_.write(c, rc);
this->output_file_.write(c, rc);
传递 rc * 2
使其在前 2048 字节后写入垃圾数据。
另一个潜在的问题是这一行:
this->myshmvector->insert(this->myshmvector->begin(), mystring);
在这里,您以相反的顺序存储数据。那是你的意图吗?在我的固定示例中,我将其更改为按顺序存储数据。
第三个潜在问题可能是@sashoalm 提到的二进制数据中的零。
这里是我修改的固定部分代码:
typedef boost::interprocess::allocator<int8_t, boost::interprocess::managed_shared_memory::segment_manager> ShmemAllocator;
typedef boost::interprocess::vector<int8_t, ShmemAllocator> MyVector;
class lambda_class {
public:
void lambda_callback(int8_t *c, int rc) {
this->sample_count_ += rc;
//this->output_file_.write(c, rc * 2); // this is not going to work, to write twice as much data you need to call write() twice
this->output_file_.write(reinterpret_cast<char*>(c), rc);
//this->output_file_.write(reinterpret_cast<char*>(c), rc); // uncomment this line to write twice as much data
}
lambda_class(std::string filename) {
this->filename_ = filename;
this->sample_count_ = 0;
this->output_file_.open(this->filename_, std::ios::binary);
write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
}
~lambda_class() {
this->output_file_.close();
this->output_file_.open(this->filename_,
std::ios::binary | std::ios::in);
write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
}
private:
std::string filename_;
int sample_count_;
std::ofstream output_file_;
lambda_class(const lambda_class &a) = delete;
};
class input_class {
public:
input_class() {
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
1000000);
const ShmemAllocator alloc_inst(this->shm->get_segment_manager());
this->myshmvector = shm->construct<MyVector>("myshmvector")(alloc_inst);
};
~input_class() {
lambda_class *lc = new lambda_class("listener_vector.wav");
int8_t *c = (int8_t *)malloc(2048);
for (MyVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it+=2048) {
memcpy(c, &*it, 2048);
lc->lambda_callback(c, 2048);
}
delete lc;
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm->destroy_ptr(this->myshmvector);
}
void to_node(int8_t *c, int rc) {
//this->myshmvector->insert(this->myshmvector->begin(), c, c+2048); // if your intention was to reverse the data then uncomment this line and comment out the line below
this->myshmvector->insert(this->myshmvector->end(), c, c + 2048); // storing data in order as it comes
}
private:
boost::interprocess::managed_shared_memory *shm;
MyVector *myshmvector;
};
void listener() {
lambda_class *ctc = new lambda_class("writer.wav");
int8_t *c = (int8_t *)malloc(2048);
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyVector *myvector = segment.find<MyVector>("myshmvector").first;
for (MyVector::iterator it = myvector->begin(); it != myvector->end(); it+=2048) {
memcpy(c, &*it, 2048);
ctc->lambda_callback(c, 2048);
}
delete ctc;
return;
}
除了其他人所说的,
您在样本 -> 字节数转换中出错。这导致一半的输出是未初始化的数据
void lambda_callback(char *c, int rc) { this->sample_count_ += rc; this->output_file_.write(c, rc * 2); }
变成
void lambda_callback(char *c, int rc) { this->sample_count_ += rc/2; this->output_file_.write(c, rc); }
NOTE see also
rc*2
into_node
below您将字符串插入向量前面 - 这会导致音频块以相反的顺序输出。此外,对
mystring
的分配忽略了rc
信息void to_node(char *c, int rc) { CharAllocator charallocator(this->shm->get_segment_manager()); StringAllocator stringallocator(this->shm->get_segment_manager()); MyShmString mystring(charallocator); mystring = c; // OOPS! this cuts off at any NUL char this->myshmvector->insert(this->myshmvector->begin(), mystring); }
变成
void to_node(char *c, int rc) { _myshmvector->emplace_back(c, rc*2, shm->get_segment_manager()); }
在迭代共享向量时,您实际上应该使用字符串大小,而不是依赖于固定的缓冲区大小。此外 不要对二进制数据使用
strcpy
:void listener() { lambda_class ctc("writer.wav"); char *c = (char *) malloc(2048); boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory"); MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first; for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) { //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?! strcpy(c, it->c_str()); ctc.lambda_callback(c, 2048); } }
变成:
void listener() { lambda_class ctc("writer.wav"); char c[4096]; boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory"); MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first; for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) { //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?! assert(it->size()<=sizeof(c)); memcpy(c, it->c_str(), it->size()); ctc.lambda_callback(c, it->size()); } }
不要在 c++ 中
malloc
。你泄露了那些你的测试场景很活泼(但我假设你知道这一点,这是为了简单起见?)
而不是 "magic" 文件重新打开,只是
seekp
:this->output_file_.close(); this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
变成
output_file_.seekp(0ul);
拉取请求 1
commit 660208ff3fe792112ccae70a61e8a2442a853664
Author: Seth Heeren <sgheeren@gmail.com>
Date: Mon Aug 17 00:13:31 2015 +0200
Fixes, now
./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav
results in proper identical files, e.g.
$ ./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav
[1] 12350
Go
./nodetest
ff083a6344d7037da9c4f6d730239f30 listener_vector.wav
ff083a6344d7037da9c4f6d730239f30 listener.wav
ff083a6344d7037da9c4f6d730239f30 writer.wav
commit 04a5dec3da322dab196bfe568f52db6d9ed68b43
Author: Seth Heeren <sgheeren@gmail.com>
Date: Sun Aug 16 23:30:40 2015 +0200
we have repro
main.cc
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>
#include <iostream>
#include <alsa_control.h>
using std::cout;
using std::endl;
typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;
class lambda_class {
public:
void lambda_callback(char *c, int rc) {
this->sample_count_ += rc/2;
this->output_file_.write(c, rc);
}
lambda_class(std::string filename) {
this->filename_ = filename;
this->sample_count_ = 0;
this->output_file_.open(this->filename_, std::ios::binary);
write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
}
~lambda_class() {
/*
*this->output_file_.close();
*this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
*/
output_file_.seekp(0ul);
write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
}
private:
std::string filename_;
int sample_count_;
std::ofstream output_file_;
lambda_class(const lambda_class &a) = delete;
};
class input_class {
public:
input_class() {
boost::interprocess::shared_memory_object::remove("MySharedMemory");
this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
1000000);
CharAllocator charallocator(this->shm->get_segment_manager());
StringAllocator stringallocator(this->shm->get_segment_manager());
this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
};
~input_class() {
lambda_class lc("listener_vector.wav");
char c[4096];
for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
assert(it->size()<=sizeof(c));
memcpy(c, it->c_str(), it->size());
lc.lambda_callback(c, it->size());
}
boost::interprocess::shared_memory_object::remove("MySharedMemory");
shm->destroy_ptr(this->myshmvector);
}
void to_node(char *c, int rc) {
this->myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
}
private:
boost::interprocess::managed_shared_memory *shm;
MyShmStringVector *myshmvector;
};
void listener() {
lambda_class ctc("writer.wav");
char c[4096];
boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
//strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
assert(it->size()<=sizeof(c));
memcpy(c, it->c_str(), it->size());
ctc.lambda_callback(c, it->size());
}
}
int main(int argc, char **argv) {
alsa_control ac(16000, 2048, 16, MONO);
if (argc == 1) {
input_class ic;
ac.listen_with_callback(std::bind(&input_class::to_node, &ic, std::placeholders::_1, std::placeholders::_2), "listener");
sleep(5);
ac.stop();
cout << "Go" << endl;
sleep(10);
} else {
// std::atomic<bool> done(false);
auto th = std::async(std::launch::async, listener);
// done.store(true, std::memory_order_relaxed);
th.get();
}
return 0;
}