GNU Radio 循环缓冲区操作

GNU Radio circular buffer manipulation

我遇到了以下错误

gr::log :WARN: tpb_thread_body - asynchronous message buffer overflowing, dropping message

出于机缘巧合,我 运行 参加了这个 GNU Radio 演示文稿 Youtube.

主持人提到了一个他称为 "buffer" 的 OOT 块,它能够消除 "buffer overflowing" 错误。显然,这个块使用不同的采样率和所谓的 "circular buffers"。我自己没有使用过循环缓冲区。欢迎任何有关循环缓冲区的想法或有关如何构建此缓冲区块的任何提示。

编辑

下面是产生错误的流程图。正如评论中所建议的那样,罪魁祸首可能是消息处理块(红色圆圈),即 generateCADU(用于生成标准 CCSDS 帧)和 processCADU(用于从数据流中提取 CADU)。 下面给出generateCADU块的实现文件

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/io_signature.h>
#include "generateCADU_impl.h"
#include "fec/ReedSolomon/ReedSolomon.h"
#include "fec/Scrambler/Scrambler.h"

namespace gr {
  namespace ccsds {

generateCADU::sptr
generateCADU::make(int frameLength,std::string sync, int scramble, int rs, int intDepth)
{
  return gnuradio::get_initial_sptr
    (new generateCADU_impl(frameLength, sync, scramble, rs, intDepth));
}

/*
 * The private constructor
 */
generateCADU_impl::generateCADU_impl(int frameLength,std::string sync, int scramble, int rs, int intDepth)
  : gr::sync_block("generateCADU",
          gr::io_signature::make(1, 1, sizeof(unsigned char)),
           gr::io_signature::make(0, 0, 0)),
d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)

{
  set_output_multiple(d_frameLength);
  //Registering output port
  message_port_register_out(pmt::mp("out"));
  d_sync = parse_string(sync);
}

/*
 * Our virtual destructor.
 */
generateCADU_impl::~generateCADU_impl()
{
}

unsigned char
generateCADU_impl::parse_hex(char c)
{
  if ('0' <= c && c <= '9') return c - '0';
  if ('A' <= c && c <= 'F') return c - 'A' + 10;
  if ('a' <= c && c <= 'f') return c - 'a' + 10;
  std::abort();
}

std::vector<unsigned char>
generateCADU_impl::parse_string(const std::string & s)
{
  if (s.size() % 2 != 0) std::abort();
  std::vector<unsigned char> result(s.size() / 2);

  for (std::size_t i = 0; i != s.size() / 2; ++i)
    result[i] = 16 * parse_hex(s[2 * i]) + parse_hex(s[2 * i + 1]);

  return result;
}
int
generateCADU_impl::work(int noutput_items,
    gr_vector_const_void_star &input_items,
    gr_vector_void_star &output_items)
{
  const unsigned char *in = (const unsigned char *) input_items[0];

  //Reed-Solomon and Scrambler objects
  ReedSolomon RS(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
  Scrambler S;

  //Buffers
  unsigned char *frameBuffer1 = (unsigned char*)malloc(d_frameLength*sizeof(unsigned char));
  std::vector<unsigned char> frameBuffer2;

  //The work function engine
  for(int i = 0; (i + d_frameLength) < noutput_items; i += d_frameLength)
{
  //Copying data from input stream
  memcpy(frameBuffer1,in + i + d_frameLength,d_frameLength);

  //Copying frame into std::vector buffer
  frameBuffer2.insert(frameBuffer2.begin(),frameBuffer1, frameBuffer1 + d_frameLength);

  //Optional scrambling and Reed-Solomon
  if (d_rs) RS.Encode_RS(frameBuffer2);
  if (d_scramble) S.Scramble(frameBuffer2);

  //Insert sync word
  frameBuffer2.insert(frameBuffer2.begin(), d_sync.begin(), d_sync.end());

  //Transmitting PDU
  pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer2.data(),frameBuffer2.size())));
  message_port_pub(pmt::mp("out"), pdu);

  //Clear buffer
  frameBuffer2.clear();
}

  // Tell runtime system how many output items we produced.
  return noutput_items;
}

} /* namespace ccsds */
} /* namespace gr */

这里是 processCADU 块。此块使用由 synchronizeCADU(它只是 correlate_access_tag 块的包装器)生成的标签来提取 CADU

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/io_signature.h>
#include "processCADU_impl.h"
#include "fec/ReedSolomon/ReedSolomon.h"
#include "fec/Scrambler/Scrambler.h"

namespace gr {
   namespace ccsds {

processCADU::sptr
processCADU::make(int frameLength, int scramble, int rs, int intDepth, std::string tagName)
{
  return gnuradio::get_initial_sptr
    (new processCADU_impl(frameLength, scramble, rs, intDepth, tagName));
}

/*
 * The private constructor
 */
processCADU_impl::processCADU_impl(int frameLength, int scramble, int rs, int intDepth, std::string tagName)
  : gr::sync_block("processCADU",
           gr::io_signature::make(1, 1, sizeof(unsigned char)),
           gr::io_signature::make(0, 0, 0)),
d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)
{
  //Multiple input
  set_output_multiple(d_frameLength * 8);

  //Registering output port
  message_port_register_out(pmt::mp("out"));

  if (d_rs) d_frameLength += 32 * d_intDepth;
  //SEtting tag name
  key = pmt::mp(tagName);
}

/*
 * Our virtual destructor.
 */
processCADU_impl::~processCADU_impl()
{
  delete d_pack;
}

int
processCADU_impl::work(int noutput_items,
           gr_vector_const_void_star &input_items,
           gr_vector_void_star &output_items)
{
  const unsigned char *in = (const unsigned char *) input_items[0];
  unsigned char *out = (unsigned char *) output_items[0];

  void *msg_data = NULL;
  unsigned char frame_data[d_frameLength];
  unsigned char frame_len = 0;
  std::vector<unsigned char> frameBuffer;

  //Reed-Solomon and Scrambler objects
  ReedSolomon RS(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
  std::vector<int> errors;//errors.push_back(0);
  Scrambler S;

  d_tags.clear();
  d_pack = new blocks::kernel::pack_k_bits(8);

  this->get_tags_in_window(d_tags, 0, 0, noutput_items,key);
  for(d_tags_itr = d_tags.begin(); d_tags_itr != d_tags.end(); d_tags_itr++) {
// Check that we have enough data for a full frame
if ((d_tags_itr->offset - this->nitems_read(0)) > (noutput_items - (d_frameLength) * 8))
  {
    return (d_tags_itr->offset - this->nitems_read(0) - 1);
  }
//Pack bits into bytes
d_pack->pack(frame_data, &in[d_tags_itr->offset - this->nitems_read(0)], d_frameLength);

//Copying frame into std::vector buffer
frameBuffer.insert(frameBuffer.begin(),frame_data, frame_data + d_frameLength);

//Optional scrambling and Reed-Solomon
if (d_scramble) S.Scramble(frameBuffer);
//if (d_rs) RS.Decode_RS(frameBuffer,errors);
//If there is Reed-Solomon decoding

if(d_rs)
  {
    RS.Decode_RS(frameBuffer,errors);
    if (RS.Success(errors)) // Success
      {
    //std::cout << "Success" << std::endl;
    pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer.data(),frameBuffer.size())));
        message_port_pub(pmt::mp("out"), pdu);
    /*for(int i=0; i < errors.size(); i++)
      {

        //std::cout << "Number of Errors : " << errors.at(i) << std::endl << std::endl;
        }*/
      }
    else // Failure
      {
    std::cout << "RS failure" << std::endl;
      }
  }
  else{
  pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer.data(),frameBuffer.size())));
  message_port_pub(pmt::mp("out"), pdu);
  }

//Clear buffers
frameBuffer.clear();
errors.clear();
  }

  // Tell runtime system how many output items we produced.
  return noutput_items;
}

} /* namespace ccsds */
} /* namespace gr */

此致, M

感谢@MarcusMüller 的建议,使用 tagged_stream 范式而不是 PDU 解决了这个问题。我能够毫无问题地传输 47 TB 的数据。下面是新实现的块的代码。

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/io_signature.h>
#include "genCADU_impl.h"


namespace gr {
  namespace ccsds {

genCADU::sptr
genCADU::make(int frameLength,std::string sync, int scramble, int rs, int intDepth, std::string len_tag_key)
{
  return gnuradio::get_initial_sptr
    (new genCADU_impl(frameLength, sync, scramble, rs, intDepth, len_tag_key));
}

/*
 * The private constructor
 */
genCADU_impl::genCADU_impl(int frameLength,std::string sync, int scramble, int rs, int intDepth, std::string len_tag_key)
  : gr::tagged_stream_block("genCADU",
          gr::io_signature::make(1, 1, sizeof(unsigned char)),
          gr::io_signature::make(1, 1, sizeof(unsigned char)),len_tag_key),
    d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)
{
    //Synchronization pattern
    d_sync = parse_string(sync);

    //Reed-Solomon and Scrambler objects
RS = new ReedSolomon(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
    S = new Scrambler();
}

/*
 * Our virtual destructor.
 */
genCADU_impl::~genCADU_impl()
{
  delete RS;
  delete S;
}

int
genCADU_impl::calculate_output_stream_length(const gr_vector_int &ninput_items)
{
  int noutput_items = (d_rs) ? d_frameLength + 32*d_intDepth + d_sync.size() : d_frameLength + d_sync.size();
  return noutput_items ;
}

unsigned char
genCADU_impl::parse_hex(char c)
{
  if ('0' <= c && c <= '9') return c - '0';
  if ('A' <= c && c <= 'F') return c - 'A' + 10;
  if ('a' <= c && c <= 'f') return c - 'a' + 10;
  std::abort();
}

std::vector<unsigned char>
genCADU_impl::parse_string(const std::string & s)
{
  if (s.size() % 2 != 0) std::abort();
  std::vector<unsigned char> result(s.size() / 2);

  for (std::size_t i = 0; i != s.size() / 2; ++i)
    result[i] = 16 * parse_hex(s[2 * i]) + parse_hex(s[2 * i + 1]);

  return result;
}

int
genCADU_impl::work (int noutput_items,
                   gr_vector_int &ninput_items,
                   gr_vector_const_void_star &input_items,
                   gr_vector_void_star &output_items)
{
  const unsigned char *in = (const unsigned char *) input_items[0];
  unsigned char *out = (unsigned char *) output_items[0];

  int total_len;

  //Copy pdu from circular buffer to local buffer
  buffer.insert(buffer.end(), in, in +  d_frameLength);

  //Optional scrambling and Reed-Solomon. TO DO: Turbo and LDPC
  if (d_rs) RS->Encode_RS(buffer);
  if (d_scramble) S->Scramble(buffer);

  //Insert sync word
  buffer.insert(buffer.begin(), d_sync.begin(), d_sync.end());

  //Copy from local buffer to circular buffer
  std::copy(buffer.begin(),buffer.end(),out);

  //Clear the local buffer
  total_len = buffer.size();
  buffer.clear();

  // Tell runtime system how many output items we produced.
  return total_len;
}

} /* namespace ccsds */
} /* namespace gr */

此致,

男.