如何使用 boost::beast 或 boost::asio 将内容异步写入文件?

How to write content to file asynchronously with boost::beast or boost::asio?

websocket_client_async_ssl.cpp的基础上修改了on_read的功能,可以将内容保存到本地文件

class session : public std::enable_shared_from_this<session>
{
    std::ofstream outfile_text;  // outfile_text.open("test.txt", std::ofstream::out);
    const int MAX_LINE_COUNT;    // 10
    int current_line_;
...
}

void on_read_version2( beast::error_code ec, std::size_t)
{
  if(ec)
      return fail(ec, "read");
  else
  {
    ++current_line_;
    const std::string buf_string = beast::buffers_to_string(buffer_.data());
    buffer_.consume(buffer_.size());

    outfile_text.write((char*)buf_string.data(), buf_string.size());
    outfile_text.write("\n", 1);

    if (current_line_ > MAX_LINE_COUNT)
    {
      outfile_text.close();
      return;
    }

    // Re-read a message into our buffer
    ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
  }
}

void on_read_version3( beast::error_code ec, std::size_t)
{
  if(ec)
      return fail(ec, "read");
  else
  {
    ++current_line_;
    buffer_.consume(buffer_.size());
    
    queue_.push_back(beast::buffers_to_string(buffer_.data()));

    // Are we already writing?
    if (queue_.size() > 1)
      return;
    else
      // async_write to file from queue_

    if (current_line_ > MAX_LINE_COUNT)
    {
      outfile_text.close();
      return;
    }

    // Re-read a message into our buffer
    ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
  }
}

在版本2中,我使用了阻塞的方式将内容写入文件。在版本 3 中,我列出了我喜欢用异步方法编写这部分逻辑的伪代码。

问题> boost::asio 或 boost::beast 是否支持 async_write 文件? 如果不是,在 on_read 函数中将内容写入文件的最佳方法是什么?

谢谢

假设 POSIX,您可以使用 stream_descriptor:

net::posix::stream_descriptor  stream_{ex_, ::creat("test.txt", 0755)};

模拟 ASIO AsyncStream 概念。

在Windows,你有相似的类型(包括stream_handle)。

演示:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/beast.hpp>

#include <chrono>
#include <deque>
#include <fstream>
#include <iostream>

namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using net::ip::tcp;
using namespace std::chrono_literals;

static inline void fail(beast::error_code ec, std::string_view msg) {
    if (ec) {
        std::cerr << msg << ": " << ec.message() << std::endl;
    }
}

class session : public std::enable_shared_from_this<session> {
  public:
    session(net::any_io_executor ex) : ex_(ex) {}

    void start()
    {
        // assumed on logical strand
        ws_.next_layer().connect({{}, 8989});
        ws_.handshake("localhost", "/");
        do_read();
    }

  private:
    const int       MAX_LINE_COUNT = 10;
    int             current_line_  = 0;
    net::streambuf  buffer_;

    net::any_io_executor           ex_;
    net::posix::stream_descriptor  stream_{ex_, ::creat("test.txt", 0755)};
    websocket::stream<tcp::socket> ws_{ex_};
    std::deque<std::string>        queue_;

    void do_read() {
        // assumed on strand
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(&session::on_read, shared_from_this()));
    }

    void on_read(beast::error_code ec, std::size_t)
    {
        if (ec)
            return fail(ec, "read");

        ++current_line_; // TODO fixme count `\n` in buffer?

        enqueue_output(beast::buffers_to_string(buffer_.data()) + '\n');

        do_read();
    }

    bool file_full() const { 
        return current_line_ > MAX_LINE_COUNT;
    }

    void enqueue_output(std::string msg) {
        if (file_full())
            return;

        queue_.push_back(std::move(msg));
        buffer_.consume(buffer_.size());

        // Are we already writing?
        if (queue_.size() == 1)
            do_write_loop();
    }

    void do_write_loop()
    {
        if (queue_.empty()){
            if (file_full())
                stream_.close();

            return;
        }

        // async_write to file from queue_
        net::async_write(
            stream_, net::buffer(queue_.front()),
            [this, self = shared_from_this()](beast::error_code ec, size_t) {
                if (!ec) {
                    queue_.pop_front();
                    do_write_loop();
                } // TODO error handling
            });
    }
};

int main()
{
    net::io_context io;

    std::make_shared<session>(make_strand(io.get_executor())) //
        ->start();

    io.run_for(5s);
}

以及使用 websocat 的现场演示:https://imgur.com/0TyHmBj