如何处理读入 ASIO streambuf 的额外字符?

How to deal with extra characters read into ASIO streambuf?

大多数库的解析器只能在 std::istream 或单个连续缓冲区上工作。这些解析器读取 istream 直到 eof,而不是文档的末尾。即使有一个很好的 boost::asio::streambuf,可以与 istream 一起使用,也存在读取和仅向其提交一帧的问题。 read_until 之类的函数正在提交它们读取的任何内容,如果它们读取下一帧的片段,则解析填充将失败。

这个模拟的 example on Coliru 显示了问题。

假设我们需要一个高效的解决方案,无需复制缓冲区,我需要确保流的结尾是文档的正确结尾。我当前的解决方案是扫描数据并在一个准备好的缓冲区上乘 commit/consume:

size_t read_some_frames( boost::asio::streambuf& strbuf, 
                         std::function< void(istream&) > parser ) {
        auto buffers= strbuf.prepare( 1024 );
        size_t read= bad_case_of_read_some( buffers );

        vector< std::pair< size_t, size_t > > frames;
        std::pair< size_t, size_t > leftover= scanForFrames( 
                    buffers_begin(buffers), 
                    buffers_begin(buffers)+read, 
                    frames, '[=10=]' );

        for( auto const& frame: frames ) {
            cout << "Frame size: " << frame.first 
                      << " skip: " << frame.second << endl;
            strbuf.commit( frame.first );
            strbuf.consume( frame.second );
            iostream stream( &strbuf );
            parser( stream );
        }
        cout << "Unfinished frame size: " << leftover.first 
                             << " skip:" << leftover.second << endl;
        strbuf.commit( leftover.first );
        strbuf.consume( leftover.second );
        return read;
}

Live on Coliru

根据documentation, this is wrong. I think this code work, because that calling commit and consume,不要释放内部缓冲区。我需要以某种方式处理这个问题。

可能的解决方案是什么?

除了在连接关闭之前读取后使用流的情况外,我认为这样的 strbuf + istream 确实没有用。

简单的问题是 istream 提取不会在 failed/partial 解析导致丢失 input/corruption 上自动更新流。

这是固定为 的模拟示例:

Live On Coliru

#include <iostream>
#include <utility>
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/qi_match.hpp>

namespace asio = boost::asio;

std::istream &parseDocument(std::istream &is, int &data) {
    namespace qi = boost::spirit::qi;
    return is >> qi::match(qi::int_ >> '[=10=]', data);
}

template <typename MutableBuffers> size_t 
    fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "[=10=]" "23", 8)); }

template <typename MutableBuffers> size_t 
    fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "[=10=]", 4)); }

#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
    auto buffers = strbuf.prepare(1024); \
    size_t read = fake_read(buffers); \
    std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
    strbuf.commit(read); \
}

int main() {
    // this is the easy scenario:
    {
        asio::streambuf strbuf;

        READ_UNTIL(strbuf, fake_read1);
        READ_UNTIL(strbuf, fake_read2);

        int data1, data2;
        std::istream stream(&strbuf);

        parseDocument(stream, data1);
        parseDocument(stream, data2);

        std::cout << "Yo: " << data1 << "\n";
        std::cout << "Yo: " << data2 << "\n";
    }

    // this is the tricky scenario:
    {
        asio::streambuf strbuf;

        READ_UNTIL(strbuf, fake_read1);
        //READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame

        int data1, data2;
        std::istream stream(&strbuf);

        parseDocument(stream, data1);

        while (!parseDocument(stream, data2)) {
            stream.clear();
            READ_UNTIL(strbuf, fake_read2);
        }

        std::cout << "Oops: " << data1 << "\n";
        std::cout << "Oops: " << data2 << "\n";
    }
}

在"tricky"场景中,您可以看到部分数据包(包含“23”)丢失,后续数据包损坏:

READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Oops: 12345
Oops: 456

您还可以看到我在 parseDocument() 函数中切换到我的首选小型、临时解析器框架:Boost Spirit。请参阅下文,了解我如何使它更适用。

1。下溢流缓冲区

可能寻找的是一个流缓冲区实现,当缓冲区下溢时它只会等待更多数据。

我相信,例如asio::ip::tcp::iostream 正是:

Live On Coliru

#include <iostream>
#include <boost/asio.hpp>

int main() {
    std::cout << boost::asio::ip::tcp::iostream("127.0.0.1", "6769").rdbuf();
}

Run this locally to see that input arrives packet-wise (e.g. with netcat)

2。解析底层 ConstBuffers 序列

或者,本着零拷贝的精神,您可能希望直接在 asio::streambuf 实现的基础缓冲区序列上进行解析,确保仅 consume() 您所拥有的 成功解析:

Live On Coliru

#include <iostream>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>

using namespace std;
namespace asio = boost::asio;

using asio::buffers_begin;
using asio::buffers_end;

template <typename ConstBuffers>
size_t parseDocument(ConstBuffers const& buffers, int &data) {

    auto b(buffers_begin(buffers)), f=b, l(buffers_end(buffers));

    namespace qi = boost::spirit::qi;
    return qi::phrase_parse(f, l, qi::int_ >> '[=13=]', qi::space, data)
        ? (f - b) 
        : 0; // only optionally consume
}

template <typename MutableBuffers> size_t 
    fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "[=13=]" "23", 8)); }

template <typename MutableBuffers> size_t 
    fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "[=13=]", 4)); }

#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
    auto buffers = strbuf.prepare(1024); \
    size_t read = fake_read(buffers); \
    std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
    strbuf.commit(read); \
}

size_t readuntil2(boost::asio::streambuf &strbuf) {

    std::cout << __PRETTY_FUNCTION__ << "\n";
    static int delay_fake_async_receive = 6;
    if (delay_fake_async_receive--)
        return 0;

    auto buffers = strbuf.prepare(1024);
    size_t read = fake_read2(buffers);
    std::cout << "read2: " << read << " bytes\n";
    strbuf.commit(read);
    return read;
}

#include <boost/range/algorithm.hpp>

int main() {
    // this is the tricky scenario:
    asio::streambuf strbuf;

    READ_UNTIL(strbuf, fake_read1);
    //READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame

    int data1=0, data2=0;

    strbuf.consume(parseDocument(strbuf.data(), data1));

    size_t consumed = 0;
    while (!(consumed = parseDocument(strbuf.data(), data2))) {
        READ_UNTIL(strbuf, fake_read2);
    }

    std::cout << "Yay: " << data1 << "\n";
    std::cout << "Yay: " << data2 << "\n";

    //asio::ip::tcp::iostream networkstream("localhost", "6767");
    std::cout << asio::ip::tcp::iostream("localhost", "6767").rdbuf();
}

版画

READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Yay: 12345
Yay: 23456

总结,集成第 3 方解析器

如果您必须使用第三方库需要一个std::istream&来解析,但您不能依赖传输与帧边界对齐,您也许可以使用混合方法:

auto n = find_frame_boundary(buffers_begin(sb.data()), buffers_end(sb.data()));

然后也许在检测到的缩小区域上使用 boost::iostream::array_source

虽然 read_until() 操作将读取的所有数据提交到 streambuf 的输入序列中,但它们 return 一个 bytes_transferred 值包含字节数直至并包括第一个定界符。本质上,它提供了帧的大小,并且可以通过以下任一方式限制 istream 仅读取 streambuf 输入序列的一部分:

  • 使用自定义 istream 限制从 streambuf 读取的字节数。实现此目的的一种更简单的方法是使用 Boost.IOStream 的 boost::iostreams::stream and implement a model of the Source 概念。
  • 创建一个派生自 Boost.Asio 的 streambuf 的自定义 streambuf。为了限制从可用输入序列中读取的字节数,自定义函数将需要处理输入序列的末尾。此外,自定义 streambuf 将需要处理下溢。

为 Boost.IOStream

自定义 Source

Boost.IOStream 的 boost::iostreams::stream 对象将 I/O 操作委托给设备。设备是实现各种 Boost.IOStream 概念模型的用户代码。在这种情况下,提供对一系列字符的读取访问的 Source 概念是唯一需要的概念。此外,当 boost::iostreams::stream 使用 Source Device 时,它​​将继承自 std::basic_istream.

在下面的代码中,asio_streambuf_input_device 是从 Boost.Asio streambuf 中读取的 Source 概念的模型。当读取了给定数量的字节时,asio_streambuf_input_device 指示下溢,即使底层 streambuf 的输入序列中仍有数据。

/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
  : public boost::iostreams::source // Use convenience class.
{
public:

  explicit
  asio_streambuf_input_device(
      boost::asio::streambuf& streambuf,
      std::streamsize bytes_transferred
  )
    : streambuf_(streambuf),
      bytes_remaining_(bytes_transferred)
  {}

  std::streamsize read(char_type* buffer, std::streamsize buffer_size)
  {
    // Determine max amount of bytes to copy.
    auto bytes_to_copy =
      std::min(bytes_remaining_, std::min(
          static_cast<std::streamsize>(streambuf_.size()), buffer_size));

    // If there is no more data to be read, indicate end-of-sequence per
    // Source concept.
    if (0 == bytes_to_copy)
    {
      return -1; // Indicate end-of-sequence, per Source concept.
    }

    // Copy from the streambuf into the provided buffer.
    std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);

    // Update bytes remaining.
    bytes_remaining_ -= bytes_to_copy;

    // Consume from the streambuf.
    streambuf_.consume(bytes_to_copy);

    return bytes_to_copy;
  }

private:
  boost::asio::streambuf& streambuf_;
  std::streamsize bytes_remaining_;
};

// ...

// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(streambuf, n);
parse(input);

这里有一个完整的例子demonstrating这种方法:

#include <functional>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/iostreams/concepts.hpp>  // boost::iostreams::source
#include <boost/iostreams/stream.hpp>

/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
  : public boost::iostreams::source // Use convenience class.
{
public:

  explicit
  asio_streambuf_input_device(
      boost::asio::streambuf& streambuf,
      std::streamsize bytes_transferred
  )
    : streambuf_(streambuf),
      bytes_remaining_(bytes_transferred)
  {}

  std::streamsize read(char_type* buffer, std::streamsize buffer_size)
  {
    // Determine max amount of bytes to copy.
    auto bytes_to_copy =
      std::min(bytes_remaining_, std::min(
          static_cast<std::streamsize>(streambuf_.size()), buffer_size));

    // If there is no more data to be read, indicate end-of-sequence per
    // Source concept.
    if (0 == bytes_to_copy)
    {
      return -1; // Indicate end-of-sequence, per Source concept.
    }

    // Copy from the streambuf into the provided buffer.
    std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);

    // Update bytes remaining.
    bytes_remaining_ -= bytes_to_copy;

    // Consume from the streambuf.
    streambuf_.consume(bytes_to_copy);

    return bytes_to_copy;
  }

private:
  boost::asio::streambuf& streambuf_;
  std::streamsize bytes_remaining_;
};

/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
  return std::string(buffers_begin(streambuf.data()),
                     buffers_end(streambuf.data()));
}

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, std::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();

  // Write to client.
  const std::string message =
    "12@"
    "345@";
  write(server_socket, boost::asio::buffer(message));

  boost::asio::streambuf streambuf;

  {
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    // Verify that the entire message "12@345@" was read into
    // streambuf's input sequence.
    assert(message.size() == streambuf.size());
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Create a custom iostream that sets a limit on the amount of bytes
    // that will be read from the streambuf.
    boost::iostreams::stream<asio_streambuf_input_device> input(
      streambuf, bytes_transferred);

    int data = 0;
    input >> data; // Consumes "12" from input sequence.
    assert(data == 12);
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Consume "@" from input sequence.
    assert(!input.eof());
    input.get(); // No more data available.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
  }
  std::cout << "streambuf contains: " << make_string(streambuf) <<
               std::endl;

  {
    // As the streambuf's input sequence already contains the delimiter,
    // this operation will not actually attempt to read data from the
    // socket.
    auto bytes_transferred = read_until(client_socket, streambuf, '@');

    // Create a custom iostream that sets a limit on the amount of bytes
    // that will be read from the streambuf.
    boost::iostreams::stream<asio_streambuf_input_device> input(
      streambuf, bytes_transferred);

    std::string data;
    getline(input, data, '@'); // Consumes delimiter.
    assert(data == "345");
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
  }

  assert(streambuf.size() == 0);
  std::cout << "streambuf is empty" << std::endl;
}

输出:

streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty

源自 boost::asio::streambuf

可以安全地从 Boost.Asio 的 streambuf 派生并实现自定义行为。在这种情况下,目标是限制 istream 在导致下溢之前可以从输入序列中提取的字节数。这可以通过以下方式完成:

  • 更新 streambuf 的获取区域(输入序列)指针,使其仅包含要读取的所需字节数。这是通过将获取区域指针 (egptr) 的末尾设置为当前字符获取区域指针 (gptr) 之后的 n 字节来实现的。在下面的代码中,我将其称为 framing.
  • 正在处理 underflow()。如果已经到达当前的末尾,则return EOF.
/// @brief Type that derives from Boost.Asio streambuf and can frame the
///        input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
  : public boost::asio::basic_streambuf<Allocator>
{
private:

  typedef boost::asio::basic_streambuf<Allocator> parent_type;

public:

  explicit 
  basic_framed_streambuf(
    std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
    const Allocator& allocator = Allocator()
  )
    : parent_type(maximum_size, allocator),
      egptr_(nullptr)
  {}

  /// @brief Limit the current input sequence to n characters.
  ///
  /// @remark An active frame is invalidated by any member function that
  ///        modifies the input or output sequence.
  void frame(std::streamsize n)
  {
    // Store actual end of input sequence.
    egptr_ = this->egptr();
    // Set the input sequence end to n characters from the current
    // input sequence pointer..
    this->setg(this->eback(), this->gptr(), this->gptr() + n);
  }

  /// @brief Restore the end of the input sequence.
  void unframe()
  {
    // Restore the end of the input sequence.
    this->setg(this->eback(), this->gptr(), this->egptr_);
    egptr_ = nullptr;
  }

protected:

  // When the end of the input sequence has been reached, underflow
  // will be invoked.
  typename parent_type::int_type underflow()
  {
    // If the  streambuf is currently framed, then return eof
    // on underflow.  Otherwise, defer to the parent implementation.
    return egptr_ ? parent_type::traits_type::eof()
                  : parent_type::underflow();
  }

private:
  char* egptr_;
};

// ...

basic_framed_streambuf<> streambuf;
// ....
streambuf.frame(n);
std::istream input(&streambuf);
parse(input);
streambuf.unframe();

这里有一个完整的例子demonstrating这种方法:

#include <functional>
#include <iostream>
#include <string>

#include <boost/asio.hpp>

/// @brief Type that derives from Boost.Asio streambuf and can frame the
///        input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
  : public boost::asio::basic_streambuf<Allocator>
{
private:

  typedef boost::asio::basic_streambuf<Allocator> parent_type;

public:

  explicit 
  basic_framed_streambuf(
    std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
    const Allocator& allocator = Allocator()
  )
    : parent_type(maximum_size, allocator),
      egptr_(nullptr)
  {}

  /// @brief Limit the current input sequence to n characters.
  ///
  /// @remark An active frame is invalidated by any member function that
  ///        modifies the input or output sequence.
  void frame(std::streamsize n)
  {
    // Store actual end of input sequence.
    egptr_ = this->egptr();
    // Set the input sequence end to n characters from the current
    // input sequence pointer..
    this->setg(this->eback(), this->gptr(), this->gptr() + n);
  }

  /// @brief Restore the end of the input sequence.
  void unframe()
  {
    // Restore the end of the input sequence.
    this->setg(this->eback(), this->gptr(), this->egptr_);
    egptr_ = nullptr;
  }

protected:

  // When the end of the input sequence has been reached, underflow
  // will be invoked.
  typename parent_type::int_type underflow()
  {
    // If the  streambuf is currently framed, then return eof
    // on underflow.  Otherwise, defer to the parent implementation.
    return egptr_ ? parent_type::traits_type::eof()
                  : parent_type::underflow();
  }

private:
  char* egptr_;
};

typedef basic_framed_streambuf<> framed_streambuf;

/// @brief RAII type that helps frame a basic_framed_streambuf within a 
///        given scope.
template <typename Streambuf>
class streambuf_frame
{
public:
  explicit streambuf_frame(Streambuf& streambuf, std::streamsize n)
    : streambuf_(streambuf)
  {
    streambuf_.frame(n);
  }

  ~streambuf_frame() { streambuf_.unframe(); }

  streambuf_frame(const streambuf_frame&) = delete;
  streambuf_frame& operator=(const streambuf_frame&) = delete;

private:
  Streambuf& streambuf_;
};

/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
  return std::string(buffers_begin(streambuf.data()),
                     buffers_end(streambuf.data()));
}

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, std::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();

  // Write to client.
  const std::string message =
    "12@"
    "345@";
  write(server_socket, boost::asio::buffer(message));

  framed_streambuf streambuf;

  // Demonstrate framing the streambuf's input sequence manually.
  {
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    // Verify that the entire message "12@345@" was read into
    // streambuf's input sequence.
    assert(message.size() == streambuf.size());
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Frame the streambuf based on bytes_transferred.  This is all data
    // up to and including the first delimiter.
    streambuf.frame(bytes_transferred);

    // Use an istream to read data from the currently framed streambuf.
    std::istream input(&streambuf);
    int data = 0;
    input >> data; // Consumes "12" from input sequence.
    assert(data == 12);
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Consume "@" from input sequence.
    assert(!input.eof());
    input.get(); // No more data available in the frame, so underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;

    // Restore the streambuf.
    streambuf.unframe();
  }

  // Demonstrate using an RAII helper to frame the streambuf's input
  // sequence.
  {
    // As the streambuf's input sequence already contains the delimiter,
    // this operation will not actually attempt to read data from the
    // socket.
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Frame the streambuf based on bytes_transferred.  This is all data
    // up to and including the first delimiter.  Use a frame RAII object
    // to only frame the streambuf within the current scope.
    streambuf_frame<framed_streambuf> frame(streambuf, bytes_transferred);

    // Use an istream to read data from the currently framed streambuf.
    std::istream input(&streambuf);
    std::string data;
    getline(input, data, '@'); // Consumes delimiter.
    assert(data == "345");
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // No more data available in the frame, so underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
    // The frame object's destructor will unframe the streambuf.
  }

  assert(streambuf.size() == 0);
  std::cout << "streambuf is empty" << std::endl;
}

输出:

streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty