如何处理读入 ASIO streambuf 的额外字符?
How to deal with extra characters read into ASIO streambuf?
大多数库的解析器只能在 std::istream
或单个连续缓冲区上工作。这些解析器读取 istream 直到 eof,而不是文档的末尾。即使有一个很好的 boost::asio::streambuf
,可以与 istream
一起使用,也存在读取和仅向其提交一帧的问题。 read_until
之类的函数正在提交它们读取的任何内容,如果它们读取下一帧的片段,则解析填充将失败。
这个模拟的 example on Coliru 显示了问题。
假设我们需要一个高效的解决方案,无需复制缓冲区,我需要确保流的结尾是文档的正确结尾。我当前的解决方案是扫描数据并在一个准备好的缓冲区上乘 commit/consume:
size_t read_some_frames( boost::asio::streambuf& strbuf,
std::function< void(istream&) > parser ) {
auto buffers= strbuf.prepare( 1024 );
size_t read= bad_case_of_read_some( buffers );
vector< std::pair< size_t, size_t > > frames;
std::pair< size_t, size_t > leftover= scanForFrames(
buffers_begin(buffers),
buffers_begin(buffers)+read,
frames, '[=10=]' );
for( auto const& frame: frames ) {
cout << "Frame size: " << frame.first
<< " skip: " << frame.second << endl;
strbuf.commit( frame.first );
strbuf.consume( frame.second );
iostream stream( &strbuf );
parser( stream );
}
cout << "Unfinished frame size: " << leftover.first
<< " skip:" << leftover.second << endl;
strbuf.commit( leftover.first );
strbuf.consume( leftover.second );
return read;
}
根据documentation, this is wrong. I think this code work, because that calling commit and consume,不要释放内部缓冲区。我需要以某种方式处理这个问题。
可能的解决方案是什么?
除了在连接关闭之前读取后使用流的情况外,我认为这样的 strbuf + istream 确实没有用。
简单的问题是 istream 提取不会在 failed/partial 解析导致丢失 input/corruption 上自动更新流。
这是固定为 的模拟示例:
#include <iostream>
#include <utility>
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/qi_match.hpp>
namespace asio = boost::asio;
std::istream &parseDocument(std::istream &is, int &data) {
namespace qi = boost::spirit::qi;
return is >> qi::match(qi::int_ >> '[=10=]', data);
}
template <typename MutableBuffers> size_t
fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "[=10=]" "23", 8)); }
template <typename MutableBuffers> size_t
fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "[=10=]", 4)); }
#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
auto buffers = strbuf.prepare(1024); \
size_t read = fake_read(buffers); \
std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
strbuf.commit(read); \
}
int main() {
// this is the easy scenario:
{
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
READ_UNTIL(strbuf, fake_read2);
int data1, data2;
std::istream stream(&strbuf);
parseDocument(stream, data1);
parseDocument(stream, data2);
std::cout << "Yo: " << data1 << "\n";
std::cout << "Yo: " << data2 << "\n";
}
// this is the tricky scenario:
{
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
//READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame
int data1, data2;
std::istream stream(&strbuf);
parseDocument(stream, data1);
while (!parseDocument(stream, data2)) {
stream.clear();
READ_UNTIL(strbuf, fake_read2);
}
std::cout << "Oops: " << data1 << "\n";
std::cout << "Oops: " << data2 << "\n";
}
}
在"tricky"场景中,您可以看到部分数据包(包含“23”)丢失,后续数据包损坏:
READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Oops: 12345
Oops: 456
您还可以看到我在 parseDocument()
函数中切换到我的首选小型、临时解析器框架:Boost Spirit。请参阅下文,了解我如何使它更适用。
1。下溢流缓冲区
您可能寻找的是一个流缓冲区实现,当缓冲区下溢时它只会等待更多数据。
我相信,例如asio::ip::tcp::iostream
正是:
#include <iostream>
#include <boost/asio.hpp>
int main() {
std::cout << boost::asio::ip::tcp::iostream("127.0.0.1", "6769").rdbuf();
}
Run this locally to see that input arrives packet-wise (e.g. with netcat)
2。解析底层 ConstBuffers
序列
或者,本着零拷贝的精神,您可能希望直接在 asio::streambuf
实现的基础缓冲区序列上进行解析,确保仅 consume()
您所拥有的 成功解析:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
using namespace std;
namespace asio = boost::asio;
using asio::buffers_begin;
using asio::buffers_end;
template <typename ConstBuffers>
size_t parseDocument(ConstBuffers const& buffers, int &data) {
auto b(buffers_begin(buffers)), f=b, l(buffers_end(buffers));
namespace qi = boost::spirit::qi;
return qi::phrase_parse(f, l, qi::int_ >> '[=13=]', qi::space, data)
? (f - b)
: 0; // only optionally consume
}
template <typename MutableBuffers> size_t
fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "[=13=]" "23", 8)); }
template <typename MutableBuffers> size_t
fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "[=13=]", 4)); }
#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
auto buffers = strbuf.prepare(1024); \
size_t read = fake_read(buffers); \
std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
strbuf.commit(read); \
}
size_t readuntil2(boost::asio::streambuf &strbuf) {
std::cout << __PRETTY_FUNCTION__ << "\n";
static int delay_fake_async_receive = 6;
if (delay_fake_async_receive--)
return 0;
auto buffers = strbuf.prepare(1024);
size_t read = fake_read2(buffers);
std::cout << "read2: " << read << " bytes\n";
strbuf.commit(read);
return read;
}
#include <boost/range/algorithm.hpp>
int main() {
// this is the tricky scenario:
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
//READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame
int data1=0, data2=0;
strbuf.consume(parseDocument(strbuf.data(), data1));
size_t consumed = 0;
while (!(consumed = parseDocument(strbuf.data(), data2))) {
READ_UNTIL(strbuf, fake_read2);
}
std::cout << "Yay: " << data1 << "\n";
std::cout << "Yay: " << data2 << "\n";
//asio::ip::tcp::iostream networkstream("localhost", "6767");
std::cout << asio::ip::tcp::iostream("localhost", "6767").rdbuf();
}
版画
READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Yay: 12345
Yay: 23456
总结,集成第 3 方解析器
如果您必须使用第三方库需要一个std::istream&
来解析,但您不能依赖传输与帧边界对齐,您也许可以使用混合方法:
auto n = find_frame_boundary(buffers_begin(sb.data()), buffers_end(sb.data()));
然后也许在检测到的缩小区域上使用 boost::iostream::array_source
。
虽然 read_until()
操作将读取的所有数据提交到 streambuf 的输入序列中,但它们 return 一个 bytes_transferred
值包含字节数直至并包括第一个定界符。本质上,它提供了帧的大小,并且可以通过以下任一方式限制 istream
仅读取 streambuf
输入序列的一部分:
- 使用自定义
istream
限制从 streambuf 读取的字节数。实现此目的的一种更简单的方法是使用 Boost.IOStream 的 boost::iostreams::stream
and implement a model of the Source 概念。
- 创建一个派生自 Boost.Asio 的
streambuf
的自定义 streambuf
。为了限制从可用输入序列中读取的字节数,自定义函数将需要处理输入序列的末尾。此外,自定义 streambuf
将需要处理下溢。
为 Boost.IOStream
自定义 Source
Boost.IOStream 的 boost::iostreams::stream
对象将 I/O 操作委托给设备。设备是实现各种 Boost.IOStream 概念模型的用户代码。在这种情况下,提供对一系列字符的读取访问的 Source 概念是唯一需要的概念。此外,当 boost::iostreams::stream
使用 Source Device 时,它将继承自 std::basic_istream
.
在下面的代码中,asio_streambuf_input_device
是从 Boost.Asio streambuf 中读取的 Source 概念的模型。当读取了给定数量的字节时,asio_streambuf_input_device
指示下溢,即使底层 streambuf 的输入序列中仍有数据。
/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
: public boost::iostreams::source // Use convenience class.
{
public:
explicit
asio_streambuf_input_device(
boost::asio::streambuf& streambuf,
std::streamsize bytes_transferred
)
: streambuf_(streambuf),
bytes_remaining_(bytes_transferred)
{}
std::streamsize read(char_type* buffer, std::streamsize buffer_size)
{
// Determine max amount of bytes to copy.
auto bytes_to_copy =
std::min(bytes_remaining_, std::min(
static_cast<std::streamsize>(streambuf_.size()), buffer_size));
// If there is no more data to be read, indicate end-of-sequence per
// Source concept.
if (0 == bytes_to_copy)
{
return -1; // Indicate end-of-sequence, per Source concept.
}
// Copy from the streambuf into the provided buffer.
std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);
// Update bytes remaining.
bytes_remaining_ -= bytes_to_copy;
// Consume from the streambuf.
streambuf_.consume(bytes_to_copy);
return bytes_to_copy;
}
private:
boost::asio::streambuf& streambuf_;
std::streamsize bytes_remaining_;
};
// ...
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(streambuf, n);
parse(input);
这里有一个完整的例子demonstrating这种方法:
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/iostreams/concepts.hpp> // boost::iostreams::source
#include <boost/iostreams/stream.hpp>
/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
: public boost::iostreams::source // Use convenience class.
{
public:
explicit
asio_streambuf_input_device(
boost::asio::streambuf& streambuf,
std::streamsize bytes_transferred
)
: streambuf_(streambuf),
bytes_remaining_(bytes_transferred)
{}
std::streamsize read(char_type* buffer, std::streamsize buffer_size)
{
// Determine max amount of bytes to copy.
auto bytes_to_copy =
std::min(bytes_remaining_, std::min(
static_cast<std::streamsize>(streambuf_.size()), buffer_size));
// If there is no more data to be read, indicate end-of-sequence per
// Source concept.
if (0 == bytes_to_copy)
{
return -1; // Indicate end-of-sequence, per Source concept.
}
// Copy from the streambuf into the provided buffer.
std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);
// Update bytes remaining.
bytes_remaining_ -= bytes_to_copy;
// Consume from the streambuf.
streambuf_.consume(bytes_to_copy);
return bytes_to_copy;
}
private:
boost::asio::streambuf& streambuf_;
std::streamsize bytes_remaining_;
};
/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
return std::string(buffers_begin(streambuf.data()),
buffers_end(streambuf.data()));
}
// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}
int main()
{
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
// Create all I/O objects.
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket server_socket(io_service);
tcp::socket client_socket(io_service);
// Connect client and server sockets.
acceptor.async_accept(server_socket, std::bind(&noop));
client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
io_service.run();
// Write to client.
const std::string message =
"12@"
"345@";
write(server_socket, boost::asio::buffer(message));
boost::asio::streambuf streambuf;
{
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Verify that the entire message "12@345@" was read into
// streambuf's input sequence.
assert(message.size() == streambuf.size());
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(
streambuf, bytes_transferred);
int data = 0;
input >> data; // Consumes "12" from input sequence.
assert(data == 12);
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Consume "@" from input sequence.
assert(!input.eof());
input.get(); // No more data available.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
}
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
{
// As the streambuf's input sequence already contains the delimiter,
// this operation will not actually attempt to read data from the
// socket.
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(
streambuf, bytes_transferred);
std::string data;
getline(input, data, '@'); // Consumes delimiter.
assert(data == "345");
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
}
assert(streambuf.size() == 0);
std::cout << "streambuf is empty" << std::endl;
}
输出:
streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
源自 boost::asio::streambuf
可以安全地从 Boost.Asio 的 streambuf
派生并实现自定义行为。在这种情况下,目标是限制 istream
在导致下溢之前可以从输入序列中提取的字节数。这可以通过以下方式完成:
- 更新 streambuf 的获取区域(输入序列)指针,使其仅包含要读取的所需字节数。这是通过将获取区域指针 (
egptr
) 的末尾设置为当前字符获取区域指针 (gptr
) 之后的 n
字节来实现的。在下面的代码中,我将其称为 framing.
- 正在处理
underflow()
。如果已经到达当前帧的末尾,则return EOF
.
/// @brief Type that derives from Boost.Asio streambuf and can frame the
/// input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
: public boost::asio::basic_streambuf<Allocator>
{
private:
typedef boost::asio::basic_streambuf<Allocator> parent_type;
public:
explicit
basic_framed_streambuf(
std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
const Allocator& allocator = Allocator()
)
: parent_type(maximum_size, allocator),
egptr_(nullptr)
{}
/// @brief Limit the current input sequence to n characters.
///
/// @remark An active frame is invalidated by any member function that
/// modifies the input or output sequence.
void frame(std::streamsize n)
{
// Store actual end of input sequence.
egptr_ = this->egptr();
// Set the input sequence end to n characters from the current
// input sequence pointer..
this->setg(this->eback(), this->gptr(), this->gptr() + n);
}
/// @brief Restore the end of the input sequence.
void unframe()
{
// Restore the end of the input sequence.
this->setg(this->eback(), this->gptr(), this->egptr_);
egptr_ = nullptr;
}
protected:
// When the end of the input sequence has been reached, underflow
// will be invoked.
typename parent_type::int_type underflow()
{
// If the streambuf is currently framed, then return eof
// on underflow. Otherwise, defer to the parent implementation.
return egptr_ ? parent_type::traits_type::eof()
: parent_type::underflow();
}
private:
char* egptr_;
};
// ...
basic_framed_streambuf<> streambuf;
// ....
streambuf.frame(n);
std::istream input(&streambuf);
parse(input);
streambuf.unframe();
这里有一个完整的例子demonstrating这种方法:
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
/// @brief Type that derives from Boost.Asio streambuf and can frame the
/// input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
: public boost::asio::basic_streambuf<Allocator>
{
private:
typedef boost::asio::basic_streambuf<Allocator> parent_type;
public:
explicit
basic_framed_streambuf(
std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
const Allocator& allocator = Allocator()
)
: parent_type(maximum_size, allocator),
egptr_(nullptr)
{}
/// @brief Limit the current input sequence to n characters.
///
/// @remark An active frame is invalidated by any member function that
/// modifies the input or output sequence.
void frame(std::streamsize n)
{
// Store actual end of input sequence.
egptr_ = this->egptr();
// Set the input sequence end to n characters from the current
// input sequence pointer..
this->setg(this->eback(), this->gptr(), this->gptr() + n);
}
/// @brief Restore the end of the input sequence.
void unframe()
{
// Restore the end of the input sequence.
this->setg(this->eback(), this->gptr(), this->egptr_);
egptr_ = nullptr;
}
protected:
// When the end of the input sequence has been reached, underflow
// will be invoked.
typename parent_type::int_type underflow()
{
// If the streambuf is currently framed, then return eof
// on underflow. Otherwise, defer to the parent implementation.
return egptr_ ? parent_type::traits_type::eof()
: parent_type::underflow();
}
private:
char* egptr_;
};
typedef basic_framed_streambuf<> framed_streambuf;
/// @brief RAII type that helps frame a basic_framed_streambuf within a
/// given scope.
template <typename Streambuf>
class streambuf_frame
{
public:
explicit streambuf_frame(Streambuf& streambuf, std::streamsize n)
: streambuf_(streambuf)
{
streambuf_.frame(n);
}
~streambuf_frame() { streambuf_.unframe(); }
streambuf_frame(const streambuf_frame&) = delete;
streambuf_frame& operator=(const streambuf_frame&) = delete;
private:
Streambuf& streambuf_;
};
/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
return std::string(buffers_begin(streambuf.data()),
buffers_end(streambuf.data()));
}
// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}
int main()
{
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
// Create all I/O objects.
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket server_socket(io_service);
tcp::socket client_socket(io_service);
// Connect client and server sockets.
acceptor.async_accept(server_socket, std::bind(&noop));
client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
io_service.run();
// Write to client.
const std::string message =
"12@"
"345@";
write(server_socket, boost::asio::buffer(message));
framed_streambuf streambuf;
// Demonstrate framing the streambuf's input sequence manually.
{
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Verify that the entire message "12@345@" was read into
// streambuf's input sequence.
assert(message.size() == streambuf.size());
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Frame the streambuf based on bytes_transferred. This is all data
// up to and including the first delimiter.
streambuf.frame(bytes_transferred);
// Use an istream to read data from the currently framed streambuf.
std::istream input(&streambuf);
int data = 0;
input >> data; // Consumes "12" from input sequence.
assert(data == 12);
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Consume "@" from input sequence.
assert(!input.eof());
input.get(); // No more data available in the frame, so underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
// Restore the streambuf.
streambuf.unframe();
}
// Demonstrate using an RAII helper to frame the streambuf's input
// sequence.
{
// As the streambuf's input sequence already contains the delimiter,
// this operation will not actually attempt to read data from the
// socket.
auto bytes_transferred = read_until(client_socket, streambuf, '@');
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Frame the streambuf based on bytes_transferred. This is all data
// up to and including the first delimiter. Use a frame RAII object
// to only frame the streambuf within the current scope.
streambuf_frame<framed_streambuf> frame(streambuf, bytes_transferred);
// Use an istream to read data from the currently framed streambuf.
std::istream input(&streambuf);
std::string data;
getline(input, data, '@'); // Consumes delimiter.
assert(data == "345");
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // No more data available in the frame, so underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
// The frame object's destructor will unframe the streambuf.
}
assert(streambuf.size() == 0);
std::cout << "streambuf is empty" << std::endl;
}
输出:
streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
大多数库的解析器只能在 std::istream
或单个连续缓冲区上工作。这些解析器读取 istream 直到 eof,而不是文档的末尾。即使有一个很好的 boost::asio::streambuf
,可以与 istream
一起使用,也存在读取和仅向其提交一帧的问题。 read_until
之类的函数正在提交它们读取的任何内容,如果它们读取下一帧的片段,则解析填充将失败。
这个模拟的 example on Coliru 显示了问题。
假设我们需要一个高效的解决方案,无需复制缓冲区,我需要确保流的结尾是文档的正确结尾。我当前的解决方案是扫描数据并在一个准备好的缓冲区上乘 commit/consume:
size_t read_some_frames( boost::asio::streambuf& strbuf,
std::function< void(istream&) > parser ) {
auto buffers= strbuf.prepare( 1024 );
size_t read= bad_case_of_read_some( buffers );
vector< std::pair< size_t, size_t > > frames;
std::pair< size_t, size_t > leftover= scanForFrames(
buffers_begin(buffers),
buffers_begin(buffers)+read,
frames, '[=10=]' );
for( auto const& frame: frames ) {
cout << "Frame size: " << frame.first
<< " skip: " << frame.second << endl;
strbuf.commit( frame.first );
strbuf.consume( frame.second );
iostream stream( &strbuf );
parser( stream );
}
cout << "Unfinished frame size: " << leftover.first
<< " skip:" << leftover.second << endl;
strbuf.commit( leftover.first );
strbuf.consume( leftover.second );
return read;
}
根据documentation, this is wrong. I think this code work, because that calling commit and consume,不要释放内部缓冲区。我需要以某种方式处理这个问题。
可能的解决方案是什么?
除了在连接关闭之前读取后使用流的情况外,我认为这样的 strbuf + istream 确实没有用。
简单的问题是 istream 提取不会在 failed/partial 解析导致丢失 input/corruption 上自动更新流。
这是固定为
#include <iostream>
#include <utility>
#include <algorithm>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/qi_match.hpp>
namespace asio = boost::asio;
std::istream &parseDocument(std::istream &is, int &data) {
namespace qi = boost::spirit::qi;
return is >> qi::match(qi::int_ >> '[=10=]', data);
}
template <typename MutableBuffers> size_t
fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "[=10=]" "23", 8)); }
template <typename MutableBuffers> size_t
fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "[=10=]", 4)); }
#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
auto buffers = strbuf.prepare(1024); \
size_t read = fake_read(buffers); \
std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
strbuf.commit(read); \
}
int main() {
// this is the easy scenario:
{
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
READ_UNTIL(strbuf, fake_read2);
int data1, data2;
std::istream stream(&strbuf);
parseDocument(stream, data1);
parseDocument(stream, data2);
std::cout << "Yo: " << data1 << "\n";
std::cout << "Yo: " << data2 << "\n";
}
// this is the tricky scenario:
{
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
//READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame
int data1, data2;
std::istream stream(&strbuf);
parseDocument(stream, data1);
while (!parseDocument(stream, data2)) {
stream.clear();
READ_UNTIL(strbuf, fake_read2);
}
std::cout << "Oops: " << data1 << "\n";
std::cout << "Oops: " << data2 << "\n";
}
}
在"tricky"场景中,您可以看到部分数据包(包含“23”)丢失,后续数据包损坏:
READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Oops: 12345
Oops: 456
您还可以看到我在 parseDocument()
函数中切换到我的首选小型、临时解析器框架:Boost Spirit。请参阅下文,了解我如何使它更适用。
1。下溢流缓冲区
您可能寻找的是一个流缓冲区实现,当缓冲区下溢时它只会等待更多数据。
我相信,例如asio::ip::tcp::iostream
正是:
#include <iostream>
#include <boost/asio.hpp>
int main() {
std::cout << boost::asio::ip::tcp::iostream("127.0.0.1", "6769").rdbuf();
}
Run this locally to see that input arrives packet-wise (e.g. with netcat)
2。解析底层 ConstBuffers
序列
或者,本着零拷贝的精神,您可能希望直接在 asio::streambuf
实现的基础缓冲区序列上进行解析,确保仅 consume()
您所拥有的 成功解析:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/spirit/include/qi.hpp>
using namespace std;
namespace asio = boost::asio;
using asio::buffers_begin;
using asio::buffers_end;
template <typename ConstBuffers>
size_t parseDocument(ConstBuffers const& buffers, int &data) {
auto b(buffers_begin(buffers)), f=b, l(buffers_end(buffers));
namespace qi = boost::spirit::qi;
return qi::phrase_parse(f, l, qi::int_ >> '[=13=]', qi::space, data)
? (f - b)
: 0; // only optionally consume
}
template <typename MutableBuffers> size_t
fake_read1(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("12345" "[=13=]" "23", 8)); }
template <typename MutableBuffers> size_t
fake_read2(MutableBuffers const &outbuf) { return asio::buffer_copy(outbuf, asio::buffer("456" "[=13=]", 4)); }
#define READ_UNTIL(/*boost::asio::streambuf &*/strbuf, fake_read) { \
auto buffers = strbuf.prepare(1024); \
size_t read = fake_read(buffers); \
std::cout << "READ_UNTIL " #fake_read ": " << read << " bytes\n"; \
strbuf.commit(read); \
}
size_t readuntil2(boost::asio::streambuf &strbuf) {
std::cout << __PRETTY_FUNCTION__ << "\n";
static int delay_fake_async_receive = 6;
if (delay_fake_async_receive--)
return 0;
auto buffers = strbuf.prepare(1024);
size_t read = fake_read2(buffers);
std::cout << "read2: " << read << " bytes\n";
strbuf.commit(read);
return read;
}
#include <boost/range/algorithm.hpp>
int main() {
// this is the tricky scenario:
asio::streambuf strbuf;
READ_UNTIL(strbuf, fake_read1);
//READ_UNTIL(strbuf, fake_read2); // will happen later, now we're stuck with a partial second frame
int data1=0, data2=0;
strbuf.consume(parseDocument(strbuf.data(), data1));
size_t consumed = 0;
while (!(consumed = parseDocument(strbuf.data(), data2))) {
READ_UNTIL(strbuf, fake_read2);
}
std::cout << "Yay: " << data1 << "\n";
std::cout << "Yay: " << data2 << "\n";
//asio::ip::tcp::iostream networkstream("localhost", "6767");
std::cout << asio::ip::tcp::iostream("localhost", "6767").rdbuf();
}
版画
READ_UNTIL fake_read1: 8 bytes
READ_UNTIL fake_read2: 4 bytes
Yay: 12345
Yay: 23456
总结,集成第 3 方解析器
如果您必须使用第三方库需要一个std::istream&
来解析,但您不能依赖传输与帧边界对齐,您也许可以使用混合方法:
auto n = find_frame_boundary(buffers_begin(sb.data()), buffers_end(sb.data()));
然后也许在检测到的缩小区域上使用 boost::iostream::array_source
。
虽然 read_until()
操作将读取的所有数据提交到 streambuf 的输入序列中,但它们 return 一个 bytes_transferred
值包含字节数直至并包括第一个定界符。本质上,它提供了帧的大小,并且可以通过以下任一方式限制 istream
仅读取 streambuf
输入序列的一部分:
- 使用自定义
istream
限制从 streambuf 读取的字节数。实现此目的的一种更简单的方法是使用 Boost.IOStream 的boost::iostreams::stream
and implement a model of the Source 概念。 - 创建一个派生自 Boost.Asio 的
streambuf
的自定义streambuf
。为了限制从可用输入序列中读取的字节数,自定义函数将需要处理输入序列的末尾。此外,自定义streambuf
将需要处理下溢。
为 Boost.IOStream
自定义Source
Boost.IOStream 的 boost::iostreams::stream
对象将 I/O 操作委托给设备。设备是实现各种 Boost.IOStream 概念模型的用户代码。在这种情况下,提供对一系列字符的读取访问的 Source 概念是唯一需要的概念。此外,当 boost::iostreams::stream
使用 Source Device 时,它将继承自 std::basic_istream
.
在下面的代码中,asio_streambuf_input_device
是从 Boost.Asio streambuf 中读取的 Source 概念的模型。当读取了给定数量的字节时,asio_streambuf_input_device
指示下溢,即使底层 streambuf 的输入序列中仍有数据。
/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
: public boost::iostreams::source // Use convenience class.
{
public:
explicit
asio_streambuf_input_device(
boost::asio::streambuf& streambuf,
std::streamsize bytes_transferred
)
: streambuf_(streambuf),
bytes_remaining_(bytes_transferred)
{}
std::streamsize read(char_type* buffer, std::streamsize buffer_size)
{
// Determine max amount of bytes to copy.
auto bytes_to_copy =
std::min(bytes_remaining_, std::min(
static_cast<std::streamsize>(streambuf_.size()), buffer_size));
// If there is no more data to be read, indicate end-of-sequence per
// Source concept.
if (0 == bytes_to_copy)
{
return -1; // Indicate end-of-sequence, per Source concept.
}
// Copy from the streambuf into the provided buffer.
std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);
// Update bytes remaining.
bytes_remaining_ -= bytes_to_copy;
// Consume from the streambuf.
streambuf_.consume(bytes_to_copy);
return bytes_to_copy;
}
private:
boost::asio::streambuf& streambuf_;
std::streamsize bytes_remaining_;
};
// ...
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(streambuf, n);
parse(input);
这里有一个完整的例子demonstrating这种方法:
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/iostreams/concepts.hpp> // boost::iostreams::source
#include <boost/iostreams/stream.hpp>
/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
: public boost::iostreams::source // Use convenience class.
{
public:
explicit
asio_streambuf_input_device(
boost::asio::streambuf& streambuf,
std::streamsize bytes_transferred
)
: streambuf_(streambuf),
bytes_remaining_(bytes_transferred)
{}
std::streamsize read(char_type* buffer, std::streamsize buffer_size)
{
// Determine max amount of bytes to copy.
auto bytes_to_copy =
std::min(bytes_remaining_, std::min(
static_cast<std::streamsize>(streambuf_.size()), buffer_size));
// If there is no more data to be read, indicate end-of-sequence per
// Source concept.
if (0 == bytes_to_copy)
{
return -1; // Indicate end-of-sequence, per Source concept.
}
// Copy from the streambuf into the provided buffer.
std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);
// Update bytes remaining.
bytes_remaining_ -= bytes_to_copy;
// Consume from the streambuf.
streambuf_.consume(bytes_to_copy);
return bytes_to_copy;
}
private:
boost::asio::streambuf& streambuf_;
std::streamsize bytes_remaining_;
};
/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
return std::string(buffers_begin(streambuf.data()),
buffers_end(streambuf.data()));
}
// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}
int main()
{
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
// Create all I/O objects.
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket server_socket(io_service);
tcp::socket client_socket(io_service);
// Connect client and server sockets.
acceptor.async_accept(server_socket, std::bind(&noop));
client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
io_service.run();
// Write to client.
const std::string message =
"12@"
"345@";
write(server_socket, boost::asio::buffer(message));
boost::asio::streambuf streambuf;
{
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Verify that the entire message "12@345@" was read into
// streambuf's input sequence.
assert(message.size() == streambuf.size());
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(
streambuf, bytes_transferred);
int data = 0;
input >> data; // Consumes "12" from input sequence.
assert(data == 12);
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Consume "@" from input sequence.
assert(!input.eof());
input.get(); // No more data available.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
}
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
{
// As the streambuf's input sequence already contains the delimiter,
// this operation will not actually attempt to read data from the
// socket.
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(
streambuf, bytes_transferred);
std::string data;
getline(input, data, '@'); // Consumes delimiter.
assert(data == "345");
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
}
assert(streambuf.size() == 0);
std::cout << "streambuf is empty" << std::endl;
}
输出:
streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
源自 boost::asio::streambuf
可以安全地从 Boost.Asio 的 streambuf
派生并实现自定义行为。在这种情况下,目标是限制 istream
在导致下溢之前可以从输入序列中提取的字节数。这可以通过以下方式完成:
- 更新 streambuf 的获取区域(输入序列)指针,使其仅包含要读取的所需字节数。这是通过将获取区域指针 (
egptr
) 的末尾设置为当前字符获取区域指针 (gptr
) 之后的n
字节来实现的。在下面的代码中,我将其称为 framing. - 正在处理
underflow()
。如果已经到达当前帧的末尾,则returnEOF
.
/// @brief Type that derives from Boost.Asio streambuf and can frame the
/// input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
: public boost::asio::basic_streambuf<Allocator>
{
private:
typedef boost::asio::basic_streambuf<Allocator> parent_type;
public:
explicit
basic_framed_streambuf(
std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
const Allocator& allocator = Allocator()
)
: parent_type(maximum_size, allocator),
egptr_(nullptr)
{}
/// @brief Limit the current input sequence to n characters.
///
/// @remark An active frame is invalidated by any member function that
/// modifies the input or output sequence.
void frame(std::streamsize n)
{
// Store actual end of input sequence.
egptr_ = this->egptr();
// Set the input sequence end to n characters from the current
// input sequence pointer..
this->setg(this->eback(), this->gptr(), this->gptr() + n);
}
/// @brief Restore the end of the input sequence.
void unframe()
{
// Restore the end of the input sequence.
this->setg(this->eback(), this->gptr(), this->egptr_);
egptr_ = nullptr;
}
protected:
// When the end of the input sequence has been reached, underflow
// will be invoked.
typename parent_type::int_type underflow()
{
// If the streambuf is currently framed, then return eof
// on underflow. Otherwise, defer to the parent implementation.
return egptr_ ? parent_type::traits_type::eof()
: parent_type::underflow();
}
private:
char* egptr_;
};
// ...
basic_framed_streambuf<> streambuf;
// ....
streambuf.frame(n);
std::istream input(&streambuf);
parse(input);
streambuf.unframe();
这里有一个完整的例子demonstrating这种方法:
#include <functional>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
/// @brief Type that derives from Boost.Asio streambuf and can frame the
/// input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
: public boost::asio::basic_streambuf<Allocator>
{
private:
typedef boost::asio::basic_streambuf<Allocator> parent_type;
public:
explicit
basic_framed_streambuf(
std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
const Allocator& allocator = Allocator()
)
: parent_type(maximum_size, allocator),
egptr_(nullptr)
{}
/// @brief Limit the current input sequence to n characters.
///
/// @remark An active frame is invalidated by any member function that
/// modifies the input or output sequence.
void frame(std::streamsize n)
{
// Store actual end of input sequence.
egptr_ = this->egptr();
// Set the input sequence end to n characters from the current
// input sequence pointer..
this->setg(this->eback(), this->gptr(), this->gptr() + n);
}
/// @brief Restore the end of the input sequence.
void unframe()
{
// Restore the end of the input sequence.
this->setg(this->eback(), this->gptr(), this->egptr_);
egptr_ = nullptr;
}
protected:
// When the end of the input sequence has been reached, underflow
// will be invoked.
typename parent_type::int_type underflow()
{
// If the streambuf is currently framed, then return eof
// on underflow. Otherwise, defer to the parent implementation.
return egptr_ ? parent_type::traits_type::eof()
: parent_type::underflow();
}
private:
char* egptr_;
};
typedef basic_framed_streambuf<> framed_streambuf;
/// @brief RAII type that helps frame a basic_framed_streambuf within a
/// given scope.
template <typename Streambuf>
class streambuf_frame
{
public:
explicit streambuf_frame(Streambuf& streambuf, std::streamsize n)
: streambuf_(streambuf)
{
streambuf_.frame(n);
}
~streambuf_frame() { streambuf_.unframe(); }
streambuf_frame(const streambuf_frame&) = delete;
streambuf_frame& operator=(const streambuf_frame&) = delete;
private:
Streambuf& streambuf_;
};
/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
return std::string(buffers_begin(streambuf.data()),
buffers_end(streambuf.data()));
}
// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}
int main()
{
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
// Create all I/O objects.
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
tcp::socket server_socket(io_service);
tcp::socket client_socket(io_service);
// Connect client and server sockets.
acceptor.async_accept(server_socket, std::bind(&noop));
client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
io_service.run();
// Write to client.
const std::string message =
"12@"
"345@";
write(server_socket, boost::asio::buffer(message));
framed_streambuf streambuf;
// Demonstrate framing the streambuf's input sequence manually.
{
auto bytes_transferred = read_until(client_socket, streambuf, '@');
// Verify that the entire message "12@345@" was read into
// streambuf's input sequence.
assert(message.size() == streambuf.size());
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Frame the streambuf based on bytes_transferred. This is all data
// up to and including the first delimiter.
streambuf.frame(bytes_transferred);
// Use an istream to read data from the currently framed streambuf.
std::istream input(&streambuf);
int data = 0;
input >> data; // Consumes "12" from input sequence.
assert(data == 12);
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // Consume "@" from input sequence.
assert(!input.eof());
input.get(); // No more data available in the frame, so underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
// Restore the streambuf.
streambuf.unframe();
}
// Demonstrate using an RAII helper to frame the streambuf's input
// sequence.
{
// As the streambuf's input sequence already contains the delimiter,
// this operation will not actually attempt to read data from the
// socket.
auto bytes_transferred = read_until(client_socket, streambuf, '@');
std::cout << "streambuf contains: " << make_string(streambuf) <<
std::endl;
// Frame the streambuf based on bytes_transferred. This is all data
// up to and including the first delimiter. Use a frame RAII object
// to only frame the streambuf within the current scope.
streambuf_frame<framed_streambuf> frame(streambuf, bytes_transferred);
// Use an istream to read data from the currently framed streambuf.
std::istream input(&streambuf);
std::string data;
getline(input, data, '@'); // Consumes delimiter.
assert(data == "345");
std::cout << "Extracted: " << data << std::endl;
assert(!input.eof());
input.get(); // No more data available in the frame, so underflow.
assert(input.eof());
std::cout << "istream has reached EOF" << std::endl;
// The frame object's destructor will unframe the streambuf.
}
assert(streambuf.size() == 0);
std::cout << "streambuf is empty" << std::endl;
}
输出:
streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty