如何使用 boost::beast 或 boost::asio 将内容异步写入文件?
How to write content to file asynchronously with boost::beast or boost::asio?
在websocket_client_async_ssl.cpp的基础上修改了on_read
的功能,可以将内容保存到本地文件
class session : public std::enable_shared_from_this<session>
{
std::ofstream outfile_text; // outfile_text.open("test.txt", std::ofstream::out);
const int MAX_LINE_COUNT; // 10
int current_line_;
...
}
void on_read_version2( beast::error_code ec, std::size_t)
{
if(ec)
return fail(ec, "read");
else
{
++current_line_;
const std::string buf_string = beast::buffers_to_string(buffer_.data());
buffer_.consume(buffer_.size());
outfile_text.write((char*)buf_string.data(), buf_string.size());
outfile_text.write("\n", 1);
if (current_line_ > MAX_LINE_COUNT)
{
outfile_text.close();
return;
}
// Re-read a message into our buffer
ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
}
}
void on_read_version3( beast::error_code ec, std::size_t)
{
if(ec)
return fail(ec, "read");
else
{
++current_line_;
buffer_.consume(buffer_.size());
queue_.push_back(beast::buffers_to_string(buffer_.data()));
// Are we already writing?
if (queue_.size() > 1)
return;
else
// async_write to file from queue_
if (current_line_ > MAX_LINE_COUNT)
{
outfile_text.close();
return;
}
// Re-read a message into our buffer
ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
}
}
在版本2中,我使用了阻塞的方式将内容写入文件。在版本 3 中,我列出了我喜欢用异步方法编写这部分逻辑的伪代码。
问题>
boost::asio 或 boost::beast 是否支持 async_write
文件?
如果不是,在 on_read
函数中将内容写入文件的最佳方法是什么?
谢谢
假设 POSIX,您可以使用 stream_descriptor
:
net::posix::stream_descriptor stream_{ex_, ::creat("test.txt", 0755)};
模拟 ASIO AsyncStream 概念。
在Windows,你有相似的类型(包括stream_handle
)。
演示:
#include <boost/asio.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/beast.hpp>
#include <chrono>
#include <deque>
#include <fstream>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
using namespace std::chrono_literals;
static inline void fail(beast::error_code ec, std::string_view msg) {
if (ec) {
std::cerr << msg << ": " << ec.message() << std::endl;
}
}
class session : public std::enable_shared_from_this<session> {
public:
session(net::any_io_executor ex) : ex_(ex) {}
void start()
{
// assumed on logical strand
ws_.next_layer().connect({{}, 8989});
ws_.handshake("localhost", "/");
do_read();
}
private:
const int MAX_LINE_COUNT = 10;
int current_line_ = 0;
net::streambuf buffer_;
net::any_io_executor ex_;
net::posix::stream_descriptor stream_{ex_, ::creat("test.txt", 0755)};
websocket::stream<tcp::socket> ws_{ex_};
std::deque<std::string> queue_;
void do_read() {
// assumed on strand
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t)
{
if (ec)
return fail(ec, "read");
++current_line_; // TODO fixme count `\n` in buffer?
enqueue_output(beast::buffers_to_string(buffer_.data()) + '\n');
do_read();
}
bool file_full() const {
return current_line_ > MAX_LINE_COUNT;
}
void enqueue_output(std::string msg) {
if (file_full())
return;
queue_.push_back(std::move(msg));
buffer_.consume(buffer_.size());
// Are we already writing?
if (queue_.size() == 1)
do_write_loop();
}
void do_write_loop()
{
if (queue_.empty()){
if (file_full())
stream_.close();
return;
}
// async_write to file from queue_
net::async_write(
stream_, net::buffer(queue_.front()),
[this, self = shared_from_this()](beast::error_code ec, size_t) {
if (!ec) {
queue_.pop_front();
do_write_loop();
} // TODO error handling
});
}
};
int main()
{
net::io_context io;
std::make_shared<session>(make_strand(io.get_executor())) //
->start();
io.run_for(5s);
}
以及使用 websocat 的现场演示:https://imgur.com/0TyHmBj
在websocket_client_async_ssl.cpp的基础上修改了on_read
的功能,可以将内容保存到本地文件
class session : public std::enable_shared_from_this<session>
{
std::ofstream outfile_text; // outfile_text.open("test.txt", std::ofstream::out);
const int MAX_LINE_COUNT; // 10
int current_line_;
...
}
void on_read_version2( beast::error_code ec, std::size_t)
{
if(ec)
return fail(ec, "read");
else
{
++current_line_;
const std::string buf_string = beast::buffers_to_string(buffer_.data());
buffer_.consume(buffer_.size());
outfile_text.write((char*)buf_string.data(), buf_string.size());
outfile_text.write("\n", 1);
if (current_line_ > MAX_LINE_COUNT)
{
outfile_text.close();
return;
}
// Re-read a message into our buffer
ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
}
}
void on_read_version3( beast::error_code ec, std::size_t)
{
if(ec)
return fail(ec, "read");
else
{
++current_line_;
buffer_.consume(buffer_.size());
queue_.push_back(beast::buffers_to_string(buffer_.data()));
// Are we already writing?
if (queue_.size() > 1)
return;
else
// async_write to file from queue_
if (current_line_ > MAX_LINE_COUNT)
{
outfile_text.close();
return;
}
// Re-read a message into our buffer
ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
}
}
在版本2中,我使用了阻塞的方式将内容写入文件。在版本 3 中,我列出了我喜欢用异步方法编写这部分逻辑的伪代码。
问题>
boost::asio 或 boost::beast 是否支持 async_write
文件?
如果不是,在 on_read
函数中将内容写入文件的最佳方法是什么?
谢谢
假设 POSIX,您可以使用 stream_descriptor
:
net::posix::stream_descriptor stream_{ex_, ::creat("test.txt", 0755)};
模拟 ASIO AsyncStream 概念。
在Windows,你有相似的类型(包括stream_handle
)。
演示:
#include <boost/asio.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/beast.hpp>
#include <chrono>
#include <deque>
#include <fstream>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
using namespace std::chrono_literals;
static inline void fail(beast::error_code ec, std::string_view msg) {
if (ec) {
std::cerr << msg << ": " << ec.message() << std::endl;
}
}
class session : public std::enable_shared_from_this<session> {
public:
session(net::any_io_executor ex) : ex_(ex) {}
void start()
{
// assumed on logical strand
ws_.next_layer().connect({{}, 8989});
ws_.handshake("localhost", "/");
do_read();
}
private:
const int MAX_LINE_COUNT = 10;
int current_line_ = 0;
net::streambuf buffer_;
net::any_io_executor ex_;
net::posix::stream_descriptor stream_{ex_, ::creat("test.txt", 0755)};
websocket::stream<tcp::socket> ws_{ex_};
std::deque<std::string> queue_;
void do_read() {
// assumed on strand
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t)
{
if (ec)
return fail(ec, "read");
++current_line_; // TODO fixme count `\n` in buffer?
enqueue_output(beast::buffers_to_string(buffer_.data()) + '\n');
do_read();
}
bool file_full() const {
return current_line_ > MAX_LINE_COUNT;
}
void enqueue_output(std::string msg) {
if (file_full())
return;
queue_.push_back(std::move(msg));
buffer_.consume(buffer_.size());
// Are we already writing?
if (queue_.size() == 1)
do_write_loop();
}
void do_write_loop()
{
if (queue_.empty()){
if (file_full())
stream_.close();
return;
}
// async_write to file from queue_
net::async_write(
stream_, net::buffer(queue_.front()),
[this, self = shared_from_this()](beast::error_code ec, size_t) {
if (!ec) {
queue_.pop_front();
do_write_loop();
} // TODO error handling
});
}
};
int main()
{
net::io_context io;
std::make_shared<session>(make_strand(io.get_executor())) //
->start();
io.run_for(5s);
}
以及使用 websocat 的现场演示:https://imgur.com/0TyHmBj