使用 boost-asio 时实时将缓冲区写入磁盘
real-time writes buffer to the disk when using boost-asio
我有一个由 boost.asio 编写的服务器。该服务器从客户端获取文件并将其写入磁盘。我只是有一个问题。当服务器获取文件时,它会在完全接收到文件后将其写入磁盘。我希望服务器以实时方式将缓冲区写入磁盘。例如,服务器将它从客户端获取的文件每 100kb 大小写入磁盘。我已经编写了以下代码,但我不知道如何编辑才能达到这个目标。
void Session::DoReadFileContent(size_t arg_bytes_transferred)
{
if (arg_bytes_transferred > 0)
{
m_outputFile.write(m_buffer.data(), static_cast<std::streamsize>(arg_bytes_transferred));
if (m_outputFile.tellp() >= static_cast<std::streamsize>(m_fileSize))
{
std::cout << "Received file: " << m_fileName << std::endl;
return;
}
}
auto self = shared_from_this();
m_socket.async_read_some(boost::asio::buffer(m_buffer.data(), m_buffer.size()),
[this, self](boost::system::error_code arg_error_code, size_t arg_bytes)
{
DoReadFileContent(arg_bytes);
});
}
首先,在这种情况下,读取明确大小的数据似乎比 read_some
读取任何可用数据更好。
在这种模式下,跟踪“剩余可接收字节数”比 m_fileSize
更容易。
这里有一些小的改组,使您的代码成为一个独立的示例。它期望服务器发送一行文本,给出负载大小和输出文件名,然后是该文件的内容。示例服务器可以是 运行 with netcat 例如:
(stat -c '%soutput.dat' main.cpp; cat main.cpp) | netcat -l -p 6969
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
using boost::system::error_code;
using boost::asio::ip::tcp;
struct Session : std::enable_shared_from_this<Session> {
Session(boost::asio::io_context& io, uint16_t port)
: m_socket(io)
{
m_socket.connect({{}, port});
}
void Start();
void DoReadFileContent(size_t transferred = 0);
private:
std::array<char, 1024> m_buffer;
std::streamsize m_remainingSize = 0;
std::string m_fileName = "noname.dat";
std::ofstream m_outputFile;
tcp::socket m_socket;
};
void Session::Start() {
// Reading a size (in text for simplicity) and subsequently receive as many bytes
//
// I'm keeping this sync for simplicity, because you probably already have
// this coded somehwere
boost::asio::streambuf buf;
error_code ec;
auto n = read_until(m_socket, buf, "\n", ec);
std::istream is(&buf);
if (is >> m_remainingSize && getline(is, m_fileName)) {
std::cerr << "Protocol trace: n:" << n << ", fileName:" << m_fileName << " payload_size:" << m_remainingSize << "\n";
m_outputFile.exceptions(std::ios::failbit | std::ios::badbit);
m_outputFile.open(m_fileName, std::ios::binary);
// write excess buffer contents as part of payload
if (buf.size()) {
std::cerr << "Writing " << buf.size() << " bytes\n";
m_remainingSize -= buf.size();
m_outputFile << &buf;
}
DoReadFileContent();
} else {
std::cerr << "Protocol error, payload_size expected\n";
}
}
void Session::DoReadFileContent(size_t transferred) {
if (transferred > 0) {
std::cerr << "Writing " << transferred << " bytes\n";
m_remainingSize -= transferred;
m_outputFile.write(m_buffer.data(), transferred);
}
if (m_remainingSize <= 0) {
std::cout << "Completed file: " << m_fileName << std::endl;
return;
}
auto self = shared_from_this();
auto expect = std::min(size_t(m_remainingSize), m_buffer.size());
std::cout << "Trying to receive next " << expect << " bytes" << std::endl;
async_read(m_socket,
boost::asio::buffer(m_buffer.data(), expect),
[this, self](error_code ec, size_t arg_bytes) {
std::cerr << "async_read: " << ec.message() << " - " << arg_bytes << " bytes\n";
if (!ec) {
DoReadFileContent(arg_bytes);
}
});
}
int main() {
boost::asio::io_context io;
std::make_shared<Session>(io, 6868) // download from port 6868
->Start();
io.run(); // complete
}
测试
(stat -c '%soutput.dat' main.cpp; cat main.cpp) | netcat -l -p 6868&
./a.out
md5sum main.cpp output.dat
打印,例如:
Protocol trace: n:15, fileName:output.dat payload_size:2654
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 606 bytes
async_read: Success - 606 bytes
Writing 606 bytes
Completed file: output.dat
最后两行
b4eec7203f6a1dcbfbf3d298c7ec0832 main.cpp
b4eec7203f6a1dcbfbf3d298c7ec0832 output.dat
表示接收到的文件与原文件相同
备注:
数据包在我的系统上以未指定的大小传送,例如相同的文件被接收为:
Protocol trace: n:15, fileName:output.dat payload_size:2654
Writing 497 bytes
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 109 bytes
async_read: Success - 109 bytes
Writing 109 bytes
Completed file: output.dat
b4eec7203f6a1dcbfbf3d298c7ec0832 main.cpp
b4eec7203f6a1dcbfbf3d298c7ec0832 output.dat
Note that it starts out with 497 bytes already in the input buffer from the read_until
.
- 协议不安全:
- 应该验证文件名。想象一下如果文件是 '/home/sehe/myimportant_file.txt' 或更糟的情况,比如说 /dev/sde1 并且我们有权进行原始块设备访问...
- 您可能想为 streambuf 指定最大大小,这样如果您得到一个永远不会发送
'\n'
的模糊器,您就不会吞噬所有 RAM
- 文件IO的错误处理很粗糙。我使用了 io 异常,但你可能想在不同的地方检查
m_outputFile.good()
而不是
我有一个由 boost.asio 编写的服务器。该服务器从客户端获取文件并将其写入磁盘。我只是有一个问题。当服务器获取文件时,它会在完全接收到文件后将其写入磁盘。我希望服务器以实时方式将缓冲区写入磁盘。例如,服务器将它从客户端获取的文件每 100kb 大小写入磁盘。我已经编写了以下代码,但我不知道如何编辑才能达到这个目标。
void Session::DoReadFileContent(size_t arg_bytes_transferred)
{
if (arg_bytes_transferred > 0)
{
m_outputFile.write(m_buffer.data(), static_cast<std::streamsize>(arg_bytes_transferred));
if (m_outputFile.tellp() >= static_cast<std::streamsize>(m_fileSize))
{
std::cout << "Received file: " << m_fileName << std::endl;
return;
}
}
auto self = shared_from_this();
m_socket.async_read_some(boost::asio::buffer(m_buffer.data(), m_buffer.size()),
[this, self](boost::system::error_code arg_error_code, size_t arg_bytes)
{
DoReadFileContent(arg_bytes);
});
}
首先,在这种情况下,读取明确大小的数据似乎比 read_some
读取任何可用数据更好。
在这种模式下,跟踪“剩余可接收字节数”比 m_fileSize
更容易。
这里有一些小的改组,使您的代码成为一个独立的示例。它期望服务器发送一行文本,给出负载大小和输出文件名,然后是该文件的内容。示例服务器可以是 运行 with netcat 例如:
(stat -c '%soutput.dat' main.cpp; cat main.cpp) | netcat -l -p 6969
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
using boost::system::error_code;
using boost::asio::ip::tcp;
struct Session : std::enable_shared_from_this<Session> {
Session(boost::asio::io_context& io, uint16_t port)
: m_socket(io)
{
m_socket.connect({{}, port});
}
void Start();
void DoReadFileContent(size_t transferred = 0);
private:
std::array<char, 1024> m_buffer;
std::streamsize m_remainingSize = 0;
std::string m_fileName = "noname.dat";
std::ofstream m_outputFile;
tcp::socket m_socket;
};
void Session::Start() {
// Reading a size (in text for simplicity) and subsequently receive as many bytes
//
// I'm keeping this sync for simplicity, because you probably already have
// this coded somehwere
boost::asio::streambuf buf;
error_code ec;
auto n = read_until(m_socket, buf, "\n", ec);
std::istream is(&buf);
if (is >> m_remainingSize && getline(is, m_fileName)) {
std::cerr << "Protocol trace: n:" << n << ", fileName:" << m_fileName << " payload_size:" << m_remainingSize << "\n";
m_outputFile.exceptions(std::ios::failbit | std::ios::badbit);
m_outputFile.open(m_fileName, std::ios::binary);
// write excess buffer contents as part of payload
if (buf.size()) {
std::cerr << "Writing " << buf.size() << " bytes\n";
m_remainingSize -= buf.size();
m_outputFile << &buf;
}
DoReadFileContent();
} else {
std::cerr << "Protocol error, payload_size expected\n";
}
}
void Session::DoReadFileContent(size_t transferred) {
if (transferred > 0) {
std::cerr << "Writing " << transferred << " bytes\n";
m_remainingSize -= transferred;
m_outputFile.write(m_buffer.data(), transferred);
}
if (m_remainingSize <= 0) {
std::cout << "Completed file: " << m_fileName << std::endl;
return;
}
auto self = shared_from_this();
auto expect = std::min(size_t(m_remainingSize), m_buffer.size());
std::cout << "Trying to receive next " << expect << " bytes" << std::endl;
async_read(m_socket,
boost::asio::buffer(m_buffer.data(), expect),
[this, self](error_code ec, size_t arg_bytes) {
std::cerr << "async_read: " << ec.message() << " - " << arg_bytes << " bytes\n";
if (!ec) {
DoReadFileContent(arg_bytes);
}
});
}
int main() {
boost::asio::io_context io;
std::make_shared<Session>(io, 6868) // download from port 6868
->Start();
io.run(); // complete
}
测试
(stat -c '%soutput.dat' main.cpp; cat main.cpp) | netcat -l -p 6868&
./a.out
md5sum main.cpp output.dat
打印,例如:
Protocol trace: n:15, fileName:output.dat payload_size:2654
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 606 bytes
async_read: Success - 606 bytes
Writing 606 bytes
Completed file: output.dat
最后两行
b4eec7203f6a1dcbfbf3d298c7ec0832 main.cpp
b4eec7203f6a1dcbfbf3d298c7ec0832 output.dat
表示接收到的文件与原文件相同
备注:
数据包在我的系统上以未指定的大小传送,例如相同的文件被接收为:
Protocol trace: n:15, fileName:output.dat payload_size:2654 Writing 497 bytes Trying to receive next 1024 bytes async_read: Success - 1024 bytes Writing 1024 bytes Trying to receive next 1024 bytes async_read: Success - 1024 bytes Writing 1024 bytes Trying to receive next 109 bytes async_read: Success - 109 bytes Writing 109 bytes Completed file: output.dat b4eec7203f6a1dcbfbf3d298c7ec0832 main.cpp b4eec7203f6a1dcbfbf3d298c7ec0832 output.dat
Note that it starts out with 497 bytes already in the input buffer from the
read_until
.
- 协议不安全:
- 应该验证文件名。想象一下如果文件是 '/home/sehe/myimportant_file.txt' 或更糟的情况,比如说 /dev/sde1 并且我们有权进行原始块设备访问...
- 您可能想为 streambuf 指定最大大小,这样如果您得到一个永远不会发送
'\n'
的模糊器,您就不会吞噬所有 RAM
- 文件IO的错误处理很粗糙。我使用了 io 异常,但你可能想在不同的地方检查
m_outputFile.good()
而不是