提升共享内存:char * 的向量

Boost shared memory : vector of char *

我尝试在 boost 容器中保存一个 char* 但失败了。我的 char* 是 2048 个内存块上的二进制数据。这些二进制数据是用ALSA录制的声音。
但是当我将它保存在共享内存字符串的向量中时,它会以某种方式发生变异,我不知道如何修复它。

编辑并大概回答:
ALSA 发送 void* 缓冲区,所以如果我可以创建一个 void* 的共享内存向量,我就可以做到这一点。所以我基本上需要创建一个 void* 向量,每个 void* 的大小必须固定(在本例中:2048)。我认为 boost::interprocess::basic_string 是问题所在
结束编辑

这里是完整的解释:

我正在尝试通过一个程序从 ALSA 的直接声音输入中收听,然后使用另一个程序将其写入文件(或处理其中的任何内容)

我从这个问题开始:Create a shared-memory vector of strings

现在我卡住了,我不太了解提升。 我创建了一个包含完整项目的 github (https://github.com/Waxo/nodetest)。 Alsa control with the method listen with callback 只需调用一个带有 (char* , int) 原型的方法。

构建项目后,当 ./nodetest 说 "Go"

时,您可以启动 ./nodetest./nodetest arg
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>

#include <iostream>
#include <alsa_control.h>

using std::cout;
using std::endl;

typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;


class lambda_class {
public:
    void lambda_callback(char *c, int rc) {
      this->sample_count_ += rc;
      this->output_file_.write(c, rc * 2);
    }

    lambda_class(std::string filename) {
      this->filename_ = filename;
      this->sample_count_ = 0;
      this->output_file_.open(this->filename_, std::ios::binary);
      write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
    }

    ~lambda_class() {
      this->output_file_.close();
      this->output_file_.open(this->filename_,
                              std::ios::binary | std::ios::in);
      write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
    }

private:
    std::string filename_;
    int sample_count_;
    std::ofstream output_file_;
    lambda_class(const lambda_class &a) = delete;
};

class input_class {
public:
    input_class() {
      boost::interprocess::shared_memory_object::remove("MySharedMemory");
      this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
                                                                 1000000);
      CharAllocator charallocator(this->shm->get_segment_manager());
      StringAllocator stringallocator(this->shm->get_segment_manager());
      this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
    };

    ~input_class() {
      lambda_class *lc = new lambda_class("listener_vector.wav");
      char *c = (char *) malloc(2048);
      for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
        strcpy(c, it->c_str());
        lc->lambda_callback(c, 2048);
      }
      delete lc;
      boost::interprocess::shared_memory_object::remove("MySharedMemory");
      this->shm->destroy_ptr(this->myshmvector);
    }

    void to_node(char *c, int rc) {
      CharAllocator charallocator(this->shm->get_segment_manager());
      StringAllocator stringallocator(this->shm->get_segment_manager());
      MyShmString mystring(charallocator);
      mystring = c;
      this->myshmvector->insert(this->myshmvector->begin(), mystring);
    }

private:
    boost::interprocess::managed_shared_memory *shm;
    MyShmStringVector *myshmvector;
};

void listener() {
  lambda_class *ctc = new lambda_class("writer.wav");
  char *c = (char *) malloc(2048);
  boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
  MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;


  for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
    strcpy(c, std::string(it->begin(), it->end()).c_str());
    ctc->lambda_callback(c, 2048);
  }

  delete ctc;
  return;
}

int main(int argc, char **argv) {
  alsa_control *ac = new alsa_control(16000, 2048, 16, MONO);

  if (argc == 1) {

    input_class *ic = new input_class();
    ac->listen_with_callback(std::bind(&input_class::to_node, ic, std::placeholders::_1, std::placeholders::_2),
                             "listener");
    sleep(5);
    ac->stop();
    cout << "Go" << endl;
    sleep(10);

    delete ic;
    delete ac;
  } else {
    auto th = std::async(std::launch::async, listener);
    th.get();
  }


  return 0;
}

我只是想在多个进程中使用我的声音,我可以使用和共享它的结构(我将为我的所有程序创建和组织)。 char*可以固定大小,能用就好了

编辑:
我的问题是录制的声音 :

无效,我认为 MyShmString 将它们变异为无效的二进制数据。

问题不清楚,建议改进问题。

尽管如此,我在您的代码中看到一个错误: 您分配了包含 2048 个元素的数组

char *c = (char *) malloc(2048);

然后你在 lambda_callback()

中写两倍
 void lambda_callback(char *c, int rc) {
   this->sample_count_ += rc;
   this->output_file_.write(c, rc * 2);
 }

您需要将 rc 而不是 rc * 2 传递给 write();

我没有检查你的代码墙,但在你的问题中我注意到你说:

I try to save an char* inside boost container

然后

My char* is binary data

然后

I think that the boost::interprocess::basic_string si the problem

现在,如果您指定了确切的类型而不是只说 "boost container" - 哪个容器,那将会有所帮助,但是如果您试图 将二进制数据存储在string class - NOOOOOOO,这不是你使用字符串的方式。您将文本存储在字符串中,而不是二进制数据。对于二进制数据,使用 std::vector<char>.

特别是,如果您在代码中的任何地方使用类似的东西

string s = (char*) binary_data;

字符串 将在第一次遇到零时被截断 !

我已经修改了您的示例代码以使用 int8_t 的进程间 vector,这可以看作是正确存储二进制数据的方法之一。

当然输出文件是错误的,因为这一行: this->output_file_.write(c, rc * 2); 如果你想写两倍的数据,你需要调用 write 两次:

this->output_file_.write(c, rc);
this->output_file_.write(c, rc);

传递 rc * 2 使其在前 2048 字节后写入垃圾数据。

另一个潜在的问题是这一行: this->myshmvector->insert(this->myshmvector->begin(), mystring); 在这里,您以相反的顺序存储数据。那是你的意图吗?在我的固定示例中,我将其更改为按顺序存储数据。

第三个潜在问题可能是@sashoalm 提到的二进制数据中的零。

这里是我修改的固定部分代码:

typedef boost::interprocess::allocator<int8_t, boost::interprocess::managed_shared_memory::segment_manager>  ShmemAllocator;
typedef boost::interprocess::vector<int8_t, ShmemAllocator> MyVector;


class lambda_class {
public:
    void lambda_callback(int8_t *c, int rc) {
        this->sample_count_ += rc;
        //this->output_file_.write(c, rc * 2); // this is not going to work, to write twice as much data you need to call write() twice
        this->output_file_.write(reinterpret_cast<char*>(c), rc);
        //this->output_file_.write(reinterpret_cast<char*>(c), rc); // uncomment this line to write twice as much data
    }

    lambda_class(std::string filename) {
        this->filename_ = filename;
        this->sample_count_ = 0;
        this->output_file_.open(this->filename_, std::ios::binary);
        write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
    }

    ~lambda_class() {
        this->output_file_.close();
        this->output_file_.open(this->filename_,
            std::ios::binary | std::ios::in);
        write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
    }

private:
    std::string filename_;
    int sample_count_;
    std::ofstream output_file_;
    lambda_class(const lambda_class &a) = delete;
};

class input_class {
public:
    input_class() {
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
            1000000);
        const ShmemAllocator alloc_inst(this->shm->get_segment_manager());
        this->myshmvector = shm->construct<MyVector>("myshmvector")(alloc_inst);
    };

    ~input_class() {
        lambda_class *lc = new lambda_class("listener_vector.wav");
        int8_t *c = (int8_t *)malloc(2048);
        for (MyVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it+=2048) {
            memcpy(c, &*it, 2048);
            lc->lambda_callback(c, 2048);
        }
        delete lc;
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        this->shm->destroy_ptr(this->myshmvector);
    }

    void to_node(int8_t *c, int rc) {
        //this->myshmvector->insert(this->myshmvector->begin(), c, c+2048); // if your intention was to reverse the data then uncomment this line and comment out the line below
        this->myshmvector->insert(this->myshmvector->end(), c, c + 2048); // storing data in order as it comes
    }

private:
    boost::interprocess::managed_shared_memory *shm;
    MyVector *myshmvector;
};

void listener() {
    lambda_class *ctc = new lambda_class("writer.wav");
    int8_t *c = (int8_t *)malloc(2048);
    boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
    MyVector *myvector = segment.find<MyVector>("myshmvector").first;


    for (MyVector::iterator it = myvector->begin(); it != myvector->end(); it+=2048) {
        memcpy(c, &*it, 2048);
        ctc->lambda_callback(c, 2048);
    }

    delete ctc;
    return;
}

除了其他人所说的,

  • 您在样本 -> 字节数转换中出错。这导致一半的输出是未初始化的数据

    void lambda_callback(char *c, int rc) {
        this->sample_count_ += rc;
        this->output_file_.write(c, rc * 2);
    }
    

    变成

    void lambda_callback(char *c, int rc) {
        this->sample_count_ += rc/2;
        this->output_file_.write(c, rc);
    }
    

    NOTE see also rc*2 in to_node below

  • 您将字符串插入向量前面 - 这会导致音频块以相反的顺序输出。此外,对 mystring 的分配忽略了 rc 信息

    void to_node(char *c, int rc) {
        CharAllocator charallocator(this->shm->get_segment_manager());
        StringAllocator stringallocator(this->shm->get_segment_manager());
        MyShmString mystring(charallocator);
        mystring = c; // OOPS! this cuts off at any NUL char
        this->myshmvector->insert(this->myshmvector->begin(), mystring);
    }
    

    变成

    void to_node(char *c, int rc) {
        _myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
    }
    
  • 在迭代共享向量时,您实际上应该使用字符串大小,而不是依赖于固定的缓冲区大小。此外 不要对二进制数据使用 strcpy:

    void listener() {
        lambda_class ctc("writer.wav");
    
        char *c = (char *) malloc(2048);
    
        boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
        MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
    
    
        for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
            //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
            strcpy(c, it->c_str());
            ctc.lambda_callback(c, 2048);
        }
    }
    

    变成:

    void listener() {
        lambda_class ctc("writer.wav");
    
        char c[4096];
    
        boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
        MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;
    
        for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
            //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
            assert(it->size()<=sizeof(c));
            memcpy(c, it->c_str(), it->size());
            ctc.lambda_callback(c, it->size());
        }
    }
    
  • 不要在 c++ 中 malloc。你泄露了那些

  • 你的测试场景很活泼(但我假设你知道这一点,这是为了简单起见?)

  • 而不是 "magic" 文件重新打开,只是 seekp:

    this->output_file_.close();
    this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
    

    变成

    output_file_.seekp(0ul);
    

拉取请求 1

Pull Request on GitHub

commit 660208ff3fe792112ccae70a61e8a2442a853664
Author: Seth Heeren <sgheeren@gmail.com>
Date:   Mon Aug 17 00:13:31 2015 +0200

    Fixes, now

        ./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav

    results in proper identical files, e.g.

        $ ./nodetest & sleep 6; ./nodetest 1; fg; md5sum *.wav
        [1] 12350
        Go
        ./nodetest
        ff083a6344d7037da9c4f6d730239f30  listener_vector.wav
        ff083a6344d7037da9c4f6d730239f30  listener.wav
        ff083a6344d7037da9c4f6d730239f30  writer.wav

commit 04a5dec3da322dab196bfe568f52db6d9ed68b43
Author: Seth Heeren <sgheeren@gmail.com>
Date:   Sun Aug 16 23:30:40 2015 +0200

    we have repro

main.cc

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <iostream>
#include <atomic>
#include <future>

#include <iostream>
#include <alsa_control.h>

using std::cout;
using std::endl;

typedef boost::interprocess::allocator<char, boost::interprocess::managed_shared_memory::segment_manager> CharAllocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, CharAllocator> MyShmString;
typedef boost::interprocess::allocator<MyShmString, boost::interprocess::managed_shared_memory::segment_manager> StringAllocator;
typedef boost::interprocess::vector<MyShmString, StringAllocator> MyShmStringVector;


class lambda_class {
public:
    void lambda_callback(char *c, int rc) {
        this->sample_count_ += rc/2;
        this->output_file_.write(c, rc);
    }

    lambda_class(std::string filename) {
      this->filename_ = filename;
      this->sample_count_ = 0;
      this->output_file_.open(this->filename_, std::ios::binary);
      write_header_wav(this->output_file_, 16000, 16, MONO, 10000);
    }

    ~lambda_class() {
      /*
       *this->output_file_.close();
       *this->output_file_.open(this->filename_, std::ios::binary | std::ios::in);
       */
      output_file_.seekp(0ul);
      write_header_wav(this->output_file_, 16000, 16, MONO, this->sample_count_);
    }

private:
    std::string filename_;
    int sample_count_;
    std::ofstream output_file_;
    lambda_class(const lambda_class &a) = delete;
};

class input_class {
public:
    input_class() {
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        this->shm = new boost::interprocess::managed_shared_memory(boost::interprocess::create_only, "MySharedMemory",
                1000000);
        CharAllocator charallocator(this->shm->get_segment_manager());
        StringAllocator stringallocator(this->shm->get_segment_manager());
        this->myshmvector = shm->construct<MyShmStringVector>("myshmvector")(stringallocator);
    };

    ~input_class() {
        lambda_class lc("listener_vector.wav");
        char c[4096];
        for (MyShmStringVector::iterator it = this->myshmvector->begin(); it != this->myshmvector->end(); it++) {
            assert(it->size()<=sizeof(c));
            memcpy(c, it->c_str(), it->size());
            lc.lambda_callback(c, it->size());
        }
        boost::interprocess::shared_memory_object::remove("MySharedMemory");
        shm->destroy_ptr(this->myshmvector);
    }

    void to_node(char *c, int rc) {
        this->myshmvector->emplace_back(c, rc*2, shm->get_segment_manager());
    }

private:
    boost::interprocess::managed_shared_memory *shm;
    MyShmStringVector *myshmvector;
};

void listener() {
    lambda_class ctc("writer.wav");

    char c[4096];

    boost::interprocess::managed_shared_memory segment(boost::interprocess::open_only, "MySharedMemory");
    MyShmStringVector *myvector = segment.find<MyShmStringVector>("myshmvector").first;

    for (MyShmStringVector::iterator it = myvector->begin(); it != myvector->end(); it++) {
        //strcpy(c, std::string(it->begin(), it->end()).c_str()); //HUHUH!?!
        assert(it->size()<=sizeof(c));
        memcpy(c, it->c_str(), it->size());
        ctc.lambda_callback(c, it->size());
    }
}

int main(int argc, char **argv) {
    alsa_control ac(16000, 2048, 16, MONO);

    if (argc == 1) {
        input_class ic;
        ac.listen_with_callback(std::bind(&input_class::to_node, &ic, std::placeholders::_1, std::placeholders::_2), "listener");
        sleep(5);
        ac.stop();
        cout << "Go" << endl;
        sleep(10);
    } else {
        //    std::atomic<bool> done(false);
        auto th = std::async(std::launch::async, listener);
        //    done.store(true, std::memory_order_relaxed);
        th.get();
    }
    return 0;
}