将 Futures 与 Boost 线程池一起使用
Use Futures with Boost Thread Pool
我正在实现一个读取和发送文件和字符串的 TCP 客户端,并且我使用 Boost 作为我的主要库。我想在继续发送字符串的同时继续读取或发送文件,在这些情况下,字符串是发送到服务器的命令。为此,我考虑使用线程池以避免客户端过载。我的问题是,当池中的线程结束时,我可以使用期货来使用回调吗?如果我不能,还有其他解决方案吗?
我正在做这样的事情,其中 pool_
是 boost:asio:thread_pool
void send_file(std::string const& file_path){
boost::asio::post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}
void handle_send_file(std::string const& file_path) {
boost::array<char, 1024> buf{};
boost::system::error_code error;
std::ifstream source_file(file_path, std::ios_base::binary | std::ios_base::ate);
if(!source_file) {
std::cout << "[ERROR] Failed to open " << file_path << std::endl;
//TODO gestire errore
}
size_t file_size = source_file.tellg();
source_file.seekg(0);
std::string file_size_readable = file_size_to_readable(file_size);
// First send file name and file size in bytes to server
boost::asio::streambuf request;
std::ostream request_stream(&request);
request_stream << file_path << "\n"
<< file_size << "\n\n"; // Consider sending readable version, does it change anything?
// Send the request
boost::asio::write(*socket_, request, error);
if(error){
std::cout << "[ERROR] Send request error:" << error << std::endl;
//TODO lanciare un'eccezione? Qua dovrò controllare se il server funziona o no
}
if(DEBUG) {
std::cout << "[DEBUG] " << file_path << " size is: " << file_size_readable << std::endl;
std::cout << "[DEBUG] Start sending file content" << std::endl;
}
long bytes_sent = 0;
float percent = 0;
print_percentage(percent);
while(!source_file.eof()) {
source_file.read(buf.c_array(), (std::streamsize)buf.size());
int bytes_read_from_file = source_file.gcount(); //int is fine because i read at most buf's size, 1024 in this case
if(bytes_read_from_file<=0) {
std::cout << "[ERROR] Read file error" << std::endl;
break;
//TODO gestire questo errore
}
percent = std::ceil((100.0 * bytes_sent) / file_size);
print_percentage(percent);
boost::asio::write(*socket_, boost::asio::buffer(buf.c_array(), source_file.gcount()),
boost::asio::transfer_all(), error);
if(error) {
std::cout << "[ERROR] Send file error:" << error << std::endl;
//TODO lanciare un'eccezione?
}
bytes_sent += bytes_read_from_file;
}
std::cout << "\n" << "[INFO] File " << file_path << " sent successfully!" << std::endl;
}
发送到池的操作结束,线程未结束。这就是 池化 线程的全部目的。
void send_file(std::string const& file_path){
post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}
这有几个问题。最大的一个是您不应该通过引用捕获 file_path
,因为参数很快就会超出范围,并且 handle_send_file
调用将在另一个线程中的未指定时间 运行。那是竞争条件和悬空参考。 Undefined Behaviour 个结果。
然后
// DO SOMETHING WHEN handle_send_file ENDS
在与handle_send_file
没有顺序关系的一行上。事实上,它可能 运行 在 操作有机会开始之前。
简化
这是一个简化版本:
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
static asio::thread_pool pool_;
struct X {
std::unique_ptr<tcp::socket> socket_;
explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
socket_->connect({ {}, port });
}
asio::thread_pool pool_;
std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };
void send_file(std::string file_path) {
post(pool_, [=, this] {
send_file_implementation(file_path);
// DO SOMETHING WHEN send_file_implementation ENDS
});
}
// throws system_error exception
void send_file_implementation(std::string file_path) {
std::ifstream source_file(file_path,
std::ios_base::binary | std::ios_base::ate);
size_t file_size = source_file.tellg();
source_file.seekg(0);
write(*socket_,
asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));
boost::array<char, 1024> buf{};
while (source_file.read(buf.c_array(), buf.size()) ||
source_file.gcount() > 0)
{
int n = source_file.gcount();
if (n <= 0) {
using namespace boost::system;
throw system_error(errc::io_error, system_category());
}
write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
}
}
};
现在,您确实可以 运行 并行执行这些操作中的几个(假设 X
的多个实例,因此您有单独的 socket_
个连接)。
要在最后做点什么,把代码放在我移动评论的地方:
// DO SOMETHING WHEN send_file_implementation ENDS
如果您不知道在那里做什么,并且希望在那个时候为未来做好准备,您可以:
std::future<void> send_file(std::string file_path) {
std::packaged_task<void()> task([=, this] {
send_file_implementation(file_path);
});
return post(pool_, std::move(task));
}
这 overload of post
神奇地¹ return 是打包任务的未来。该打包任务将使用 (void
) return 值或抛出的异常来设置内部承诺。
查看实际效果:Live On Coliru
int main() {
// send two files simultaneously to different connections
X clientA(6868);
X clientB(6969);
std::future<void> futures[] = {
clientA.send_file("main.cpp"),
clientB.send_file("main.cpp"),
};
for (auto& fut : futures) try {
fut.get();
std::cout << "Everything completed without error\n";
} catch(std::exception const& e) {
std::cout << "Error occurred: " << e.what() << "\n";
};
pool_.join();
}
我测试了这个,同时 运行宁两个 netcats 监听 6868/6969:
nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
./a.out
wait
服务器打印:
Everything completed without error
Everything completed without error
netcat 打印它们的过滤输出:
main.cpp
1907
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <future>
namespace asio = boost::asio;
using asio::ip::tcp;
7ecb71992bcbc22bda44d78ad3e2a5ef -
¹ 不是 magic:参见 https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html
我正在实现一个读取和发送文件和字符串的 TCP 客户端,并且我使用 Boost 作为我的主要库。我想在继续发送字符串的同时继续读取或发送文件,在这些情况下,字符串是发送到服务器的命令。为此,我考虑使用线程池以避免客户端过载。我的问题是,当池中的线程结束时,我可以使用期货来使用回调吗?如果我不能,还有其他解决方案吗?
我正在做这样的事情,其中 pool_
是 boost:asio:thread_pool
void send_file(std::string const& file_path){
boost::asio::post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}
void handle_send_file(std::string const& file_path) {
boost::array<char, 1024> buf{};
boost::system::error_code error;
std::ifstream source_file(file_path, std::ios_base::binary | std::ios_base::ate);
if(!source_file) {
std::cout << "[ERROR] Failed to open " << file_path << std::endl;
//TODO gestire errore
}
size_t file_size = source_file.tellg();
source_file.seekg(0);
std::string file_size_readable = file_size_to_readable(file_size);
// First send file name and file size in bytes to server
boost::asio::streambuf request;
std::ostream request_stream(&request);
request_stream << file_path << "\n"
<< file_size << "\n\n"; // Consider sending readable version, does it change anything?
// Send the request
boost::asio::write(*socket_, request, error);
if(error){
std::cout << "[ERROR] Send request error:" << error << std::endl;
//TODO lanciare un'eccezione? Qua dovrò controllare se il server funziona o no
}
if(DEBUG) {
std::cout << "[DEBUG] " << file_path << " size is: " << file_size_readable << std::endl;
std::cout << "[DEBUG] Start sending file content" << std::endl;
}
long bytes_sent = 0;
float percent = 0;
print_percentage(percent);
while(!source_file.eof()) {
source_file.read(buf.c_array(), (std::streamsize)buf.size());
int bytes_read_from_file = source_file.gcount(); //int is fine because i read at most buf's size, 1024 in this case
if(bytes_read_from_file<=0) {
std::cout << "[ERROR] Read file error" << std::endl;
break;
//TODO gestire questo errore
}
percent = std::ceil((100.0 * bytes_sent) / file_size);
print_percentage(percent);
boost::asio::write(*socket_, boost::asio::buffer(buf.c_array(), source_file.gcount()),
boost::asio::transfer_all(), error);
if(error) {
std::cout << "[ERROR] Send file error:" << error << std::endl;
//TODO lanciare un'eccezione?
}
bytes_sent += bytes_read_from_file;
}
std::cout << "\n" << "[INFO] File " << file_path << " sent successfully!" << std::endl;
}
发送到池的操作结束,线程未结束。这就是 池化 线程的全部目的。
void send_file(std::string const& file_path){
post(pool_, [this, &file_path] {
handle_send_file(file_path);
});
// DO SOMETHING WHEN handle_send_file ENDS
}
这有几个问题。最大的一个是您不应该通过引用捕获 file_path
,因为参数很快就会超出范围,并且 handle_send_file
调用将在另一个线程中的未指定时间 运行。那是竞争条件和悬空参考。 Undefined Behaviour 个结果。
然后
// DO SOMETHING WHEN handle_send_file ENDS
在与handle_send_file
没有顺序关系的一行上。事实上,它可能 运行 在 操作有机会开始之前。
简化
这是一个简化版本:
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
static asio::thread_pool pool_;
struct X {
std::unique_ptr<tcp::socket> socket_;
explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
socket_->connect({ {}, port });
}
asio::thread_pool pool_;
std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };
void send_file(std::string file_path) {
post(pool_, [=, this] {
send_file_implementation(file_path);
// DO SOMETHING WHEN send_file_implementation ENDS
});
}
// throws system_error exception
void send_file_implementation(std::string file_path) {
std::ifstream source_file(file_path,
std::ios_base::binary | std::ios_base::ate);
size_t file_size = source_file.tellg();
source_file.seekg(0);
write(*socket_,
asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));
boost::array<char, 1024> buf{};
while (source_file.read(buf.c_array(), buf.size()) ||
source_file.gcount() > 0)
{
int n = source_file.gcount();
if (n <= 0) {
using namespace boost::system;
throw system_error(errc::io_error, system_category());
}
write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
}
}
};
现在,您确实可以 运行 并行执行这些操作中的几个(假设 X
的多个实例,因此您有单独的 socket_
个连接)。
要在最后做点什么,把代码放在我移动评论的地方:
// DO SOMETHING WHEN send_file_implementation ENDS
如果您不知道在那里做什么,并且希望在那个时候为未来做好准备,您可以:
std::future<void> send_file(std::string file_path) {
std::packaged_task<void()> task([=, this] {
send_file_implementation(file_path);
});
return post(pool_, std::move(task));
}
这 overload of post
神奇地¹ return 是打包任务的未来。该打包任务将使用 (void
) return 值或抛出的异常来设置内部承诺。
查看实际效果:Live On Coliru
int main() {
// send two files simultaneously to different connections
X clientA(6868);
X clientB(6969);
std::future<void> futures[] = {
clientA.send_file("main.cpp"),
clientB.send_file("main.cpp"),
};
for (auto& fut : futures) try {
fut.get();
std::cout << "Everything completed without error\n";
} catch(std::exception const& e) {
std::cout << "Error occurred: " << e.what() << "\n";
};
pool_.join();
}
我测试了这个,同时 运行宁两个 netcats 监听 6868/6969:
nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
./a.out
wait
服务器打印:
Everything completed without error
Everything completed without error
netcat 打印它们的过滤输出:
main.cpp
1907
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <future>
namespace asio = boost::asio;
using asio::ip::tcp;
7ecb71992bcbc22bda44d78ad3e2a5ef -
¹ 不是 magic:参见 https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html