如何在asio中批量发送未发送的消息
How to batch send unsent messages in asio
asio中有一个例子,将发送的消息缓存在一个双端队列中。我想当这个deque中未发送的消息太多的时候,比如1000条,我想通过constbuffersequence来处理,也就是批量发送,所以下面的代码应该怎么改,谢谢!
void deliver(const chat_message& msg)
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
boost::bind(&chat_session::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
}
void handle_write(const boost::system::error_code& error)
{
if (!error)
{
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
boost::bind(&chat_session::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
}
else
{
room_.leave(shared_from_this());
}
}
您可以将双端队列转换为任何对 const 缓冲区序列概念进行建模的容器:
std::vector<asio::const_buffer> buffers;
std::transform(
begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
[](Message const& s) { return asio::buffer(s); });
async_write( //
socket_, buffers,
[this, self = shared_from_this()] //
(error_code ec, std::size_t bytes_written) {
// ...
write_msgs_.clear();
});
这里transform
是习惯的力量,你可能更喜欢
std::vector<asio::const_buffer> buffers;
for (auto& s: write_msgs_)
buffers.push_back(asio::buffer(s));
现场演示
根据这个最近的例子修改 :
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
using Message = std::string;
class chat_session : public std::enable_shared_from_this<chat_session> {
public:
chat_session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
void deliver_many(std::vector<Message> msgs) {
post(socket_.get_executor(),
[this, msgs = std::move(msgs), self = shared_from_this()] //
() mutable {
for (auto& msg : msgs) {
do_write(std::move(msg));
}
});
}
void deliver(Message msg) {
post(socket_.get_executor(),
[this, msg = std::move(msg), self = shared_from_this()] //
() mutable { do_write(std::move(msg)); });
}
private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(incoming_), '[=12=]',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
process_message(incoming_.substr(0, length - 1));
incoming_.erase(0, length);
do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}
void do_write(Message message)
{
write_msgs_.push_back(std::move(message)); // assumed on (implicit) strand
if (write_msgs_.size() == 1) {
write_loop();
}
}
void write_loop() {
std::cerr << "write_loop with write_msgs_.size() = " << write_msgs_.size() << std::endl;
if (write_msgs_.empty())
return;
if (write_msgs_.size() > 100) {
std::vector<asio::const_buffer> buffers;
std::transform(
begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
[](Message const& s) { return asio::buffer(s); });
async_write( //
socket_, buffers,
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
write_msgs_.clear();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
} else {
async_write( //
socket_, asio::buffer(write_msgs_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
write_msgs_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}
}
void process_message(Message const& message) {
std::vector<Message> responses;
for (int i = 0; i < 200; ++i) {
responses.push_back("Response #" + std::to_string(i) + " for " +
message + "\n");
}
// dispatch/post to executor because we might be on a different thread (not in this example)
// (not in this example)
post(socket_.get_executor(),
std::bind(&chat_session::deliver_many, shared_from_this(),
std::move(responses)));
}
tcp::socket socket_;
Message incoming_;
std::deque<Message> write_msgs_;
};
class server {
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
make_strand(acceptor_.get_executor()),
[this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<chat_session>(std::move(s))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main() {
asio::thread_pool ctx;
server s(ctx.get_executor(), 8989);
ctx.join();
}
从客户端发送单个消息时:
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp
./a.out&
sleep .5; printf 'HelloWorld[=13=]' | nc 127.0.0.1 8989 -w1
显示例如:
Accepted 127.0.0.1:39538
write_loop with write_msgs_.size() = 1
Response #0 for HelloWorld
write_loop with write_msgs_.size() = 199
Response #1 for HelloWorld
Response #2 for HelloWorld
Response #3 for HelloWorld
Response #4 for HelloWorld
Response #5 for HelloWorld
Response #6 for HelloWorld
Response #7 for HelloWorld
Response #8 for HelloWorld
Response #9 for HelloWorld
Response #10 for HelloWorld
Response #11 for HelloWorld
Response #12 for HelloWorld
Response #13 for HelloWorld
Response #14 for HelloWorld
Response #15 for HelloWorld
Response #16 for HelloWorld
Response #17 for HelloWorld
Response #18 for HelloWorld
Response #19 for HelloWorld
Response #20 for HelloWorld
Response #21 for HelloWorld
Response #22 for HelloWorld
Response #23 for HelloWorld
Response #24 for HelloWorld
Response #25 for HelloWorld
Response #26 for HelloWorld
Response #27 for HelloWorld
Response #28 for HelloWorld
Response #29 for HelloWorld
Response #30 for HelloWorld
Response #31 for HelloWorld
Response #32 for HelloWorld
Response #33 for HelloWorld
Response #34 for HelloWorld
Response #35 for HelloWorld
Response #36 for HelloWorld
Response #37 for HelloWorld
Response #38 for HelloWorld
Response #39 for HelloWorld
Response #40 for HelloWorld
Response #41 for HelloWorld
Response #42 for HelloWorld
Response #43 for HelloWorld
Response #44 for HelloWorld
Response #45 for HelloWorld
Response #46 for HelloWorld
Response #47 for HelloWorld
Response #48 for HelloWorld
Response #49 for HelloWorld
Response #50 for HelloWorld
Response #51 for HelloWorld
Response #52 for HelloWorld
Response #53 for HelloWorld
Response #54 for HelloWorld
Response #55 for HelloWorld
Response #56 for HelloWorld
Response #57 for HelloWorld
Response #58 for HelloWorld
Response #59 for HelloWorld
Response #60 for HelloWorld
Response #61 for HelloWorld
Response #62 for HelloWorld
Response #63 for HelloWorld
Response #64 for HelloWorld
Response #65 for HelloWorld
Response #66 for HelloWorld
Response #67 for HelloWorld
Response #68 for HelloWorld
Response #69 for HelloWorld
Response #70 for HelloWorld
Response #71 for HelloWorld
Response #72 for HelloWorld
Response #73 for HelloWorld
Response #74 for HelloWorld
Response #75 for HelloWorld
Response #76 for HelloWorld
Response #77 for HelloWorld
Response #78 for HelloWorld
Response #79 for HelloWorld
Response #80 for HelloWorld
Response #81 for HelloWorld
Response #82 for HelloWorld
Response #83 for HelloWorld
Response #84 for HelloWorld
Response #85 for HelloWorld
Response #86 for HelloWorld
Response #87 for HelloWorld
Response #88 for HelloWorld
Response #89 for HelloWorld
Response #90 for HelloWorld
Response #91 for HelloWorld
Response #92 for HelloWorld
Response #93 for HelloWorld
Response #94 for HelloWorld
Response #95 for HelloWorld
Response #96 for HelloWorld
Response #97 for HelloWorld
Response #98 for HelloWorld
Response #99 for HelloWorld
Response #100 for HelloWorld
Response #101 for HelloWorld
Response #102 for HelloWorld
Response #103 for HelloWorld
Response #104 for HelloWorld
Response #105 for HelloWorld
Response #106 for HelloWorld
Response #107 for HelloWorld
Response #108 for HelloWorld
Response #109 for HelloWorld
Response #110 for HelloWorld
Response #111 for HelloWorld
Response #112 for HelloWorld
Response #113 for HelloWorld
Response #114 for HelloWorld
Response #115 for HelloWorld
Response #116 for HelloWorld
Response #117 for HelloWorld
Response #118 for HelloWorld
Response #119 for HelloWorld
Response #120 for HelloWorld
Response #121 for HelloWorld
Response #122 for HelloWorld
Response #123 for HelloWorld
Response #124 for HelloWorld
Response #125 for HelloWorld
Response #126 for HelloWorld
Response #127 for HelloWorld
Response #128 for HelloWorld
Response #129 for HelloWorld
Response #130 for HelloWorld
Response #131 for HelloWorld
Response #132 for HelloWorld
Response #133 for HelloWorld
Response #134 for HelloWorld
Response #135 for HelloWorld
Response #136 for HelloWorld
Response #137 for HelloWorld
Response #138 for HelloWorld
Response #139 for HelloWorld
Response #140 for HelloWorld
Response #141 for HelloWorld
Response #142 for HelloWorld
Response #143 for HelloWorld
Response #144 for HelloWorld
Response #145 for HelloWorld
Response #146 for HelloWorld
Response #147 for HelloWorld
Response #148 for HelloWorld
Response #149 for HelloWorld
Response #150 for HelloWorld
Response #151 for HelloWorld
Response #152 for HelloWorld
Response #153 for HelloWorld
Response #154 for HelloWorld
Response #155 for HelloWorld
Response #156 for HelloWorld
Response #157 for HelloWorld
Response #158 for HelloWorld
Response #159 for HelloWorld
Response #160 for HelloWorld
Response #161 for HelloWorld
Response #162 for HelloWorld
Response #163 for HelloWorld
Response #164 for HelloWorld
Response #165 for HelloWorld
Response #166 for HelloWorld
Response #167 for HelloWorld
Response #168 for HelloWorld
Response #169 for HelloWorld
Response #170 for HelloWorld
Response #171 for HelloWorld
Response #172 for HelloWorld
Response #173 for HelloWorld
Response #174 for HelloWorld
Response #175 for HelloWorld
Response #176 for HelloWorld
Response #177 for HelloWorld
Response #178 for HelloWorld
Response #179 for HelloWorld
Response #180 for HelloWorld
Response #181 for HelloWorld
Response #182 for HelloWorld
Response #183 for HelloWorld
Response #184 for HelloWorld
Response #185 for HelloWorld
Response #186 for HelloWorld
Response #187 for HelloWorld
Response #188 for HelloWorld
Response #189 for HelloWorld
Response #190 for HelloWorld
Response #191 for HelloWorld
Response #192 for HelloWorld
Response #193 for HelloWorld
Response #194 for HelloWorld
Response #195 for HelloWorld
Response #196 for HelloWorld
Response #197 for HelloWorld
Response #198 for HelloWorld
Response #199 for HelloWorld
write_loop with write_msgs_.size() = 0
asio中有一个例子,将发送的消息缓存在一个双端队列中。我想当这个deque中未发送的消息太多的时候,比如1000条,我想通过constbuffersequence来处理,也就是批量发送,所以下面的代码应该怎么改,谢谢!
void deliver(const chat_message& msg)
{
bool write_in_progress = !write_msgs_.empty();
write_msgs_.push_back(msg);
if (!write_in_progress)
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
boost::bind(&chat_session::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
}
void handle_write(const boost::system::error_code& error)
{
if (!error)
{
write_msgs_.pop_front();
if (!write_msgs_.empty())
{
boost::asio::async_write(socket_,
boost::asio::buffer(write_msgs_.front().data(),
write_msgs_.front().length()),
boost::bind(&chat_session::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
}
else
{
room_.leave(shared_from_this());
}
}
您可以将双端队列转换为任何对 const 缓冲区序列概念进行建模的容器:
std::vector<asio::const_buffer> buffers;
std::transform(
begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
[](Message const& s) { return asio::buffer(s); });
async_write( //
socket_, buffers,
[this, self = shared_from_this()] //
(error_code ec, std::size_t bytes_written) {
// ...
write_msgs_.clear();
});
这里transform
是习惯的力量,你可能更喜欢
std::vector<asio::const_buffer> buffers;
for (auto& s: write_msgs_)
buffers.push_back(asio::buffer(s));
现场演示
根据这个最近的例子修改
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
using Message = std::string;
class chat_session : public std::enable_shared_from_this<chat_session> {
public:
chat_session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() { do_read(); }
void deliver_many(std::vector<Message> msgs) {
post(socket_.get_executor(),
[this, msgs = std::move(msgs), self = shared_from_this()] //
() mutable {
for (auto& msg : msgs) {
do_write(std::move(msg));
}
});
}
void deliver(Message msg) {
post(socket_.get_executor(),
[this, msg = std::move(msg), self = shared_from_this()] //
() mutable { do_write(std::move(msg)); });
}
private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(incoming_), '[=12=]',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
process_message(incoming_.substr(0, length - 1));
incoming_.erase(0, length);
do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}
void do_write(Message message)
{
write_msgs_.push_back(std::move(message)); // assumed on (implicit) strand
if (write_msgs_.size() == 1) {
write_loop();
}
}
void write_loop() {
std::cerr << "write_loop with write_msgs_.size() = " << write_msgs_.size() << std::endl;
if (write_msgs_.empty())
return;
if (write_msgs_.size() > 100) {
std::vector<asio::const_buffer> buffers;
std::transform(
begin(write_msgs_), end(write_msgs_), back_inserter(buffers),
[](Message const& s) { return asio::buffer(s); });
async_write( //
socket_, buffers,
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
write_msgs_.clear();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
} else {
async_write( //
socket_, asio::buffer(write_msgs_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
write_msgs_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}
}
void process_message(Message const& message) {
std::vector<Message> responses;
for (int i = 0; i < 200; ++i) {
responses.push_back("Response #" + std::to_string(i) + " for " +
message + "\n");
}
// dispatch/post to executor because we might be on a different thread (not in this example)
// (not in this example)
post(socket_.get_executor(),
std::bind(&chat_session::deliver_many, shared_from_this(),
std::move(responses)));
}
tcp::socket socket_;
Message incoming_;
std::deque<Message> write_msgs_;
};
class server {
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
make_strand(acceptor_.get_executor()),
[this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<chat_session>(std::move(s))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main() {
asio::thread_pool ctx;
server s(ctx.get_executor(), 8989);
ctx.join();
}
从客户端发送单个消息时:
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp
./a.out&
sleep .5; printf 'HelloWorld[=13=]' | nc 127.0.0.1 8989 -w1
显示例如:
Accepted 127.0.0.1:39538
write_loop with write_msgs_.size() = 1
Response #0 for HelloWorld
write_loop with write_msgs_.size() = 199
Response #1 for HelloWorld
Response #2 for HelloWorld
Response #3 for HelloWorld
Response #4 for HelloWorld
Response #5 for HelloWorld
Response #6 for HelloWorld
Response #7 for HelloWorld
Response #8 for HelloWorld
Response #9 for HelloWorld
Response #10 for HelloWorld
Response #11 for HelloWorld
Response #12 for HelloWorld
Response #13 for HelloWorld
Response #14 for HelloWorld
Response #15 for HelloWorld
Response #16 for HelloWorld
Response #17 for HelloWorld
Response #18 for HelloWorld
Response #19 for HelloWorld
Response #20 for HelloWorld
Response #21 for HelloWorld
Response #22 for HelloWorld
Response #23 for HelloWorld
Response #24 for HelloWorld
Response #25 for HelloWorld
Response #26 for HelloWorld
Response #27 for HelloWorld
Response #28 for HelloWorld
Response #29 for HelloWorld
Response #30 for HelloWorld
Response #31 for HelloWorld
Response #32 for HelloWorld
Response #33 for HelloWorld
Response #34 for HelloWorld
Response #35 for HelloWorld
Response #36 for HelloWorld
Response #37 for HelloWorld
Response #38 for HelloWorld
Response #39 for HelloWorld
Response #40 for HelloWorld
Response #41 for HelloWorld
Response #42 for HelloWorld
Response #43 for HelloWorld
Response #44 for HelloWorld
Response #45 for HelloWorld
Response #46 for HelloWorld
Response #47 for HelloWorld
Response #48 for HelloWorld
Response #49 for HelloWorld
Response #50 for HelloWorld
Response #51 for HelloWorld
Response #52 for HelloWorld
Response #53 for HelloWorld
Response #54 for HelloWorld
Response #55 for HelloWorld
Response #56 for HelloWorld
Response #57 for HelloWorld
Response #58 for HelloWorld
Response #59 for HelloWorld
Response #60 for HelloWorld
Response #61 for HelloWorld
Response #62 for HelloWorld
Response #63 for HelloWorld
Response #64 for HelloWorld
Response #65 for HelloWorld
Response #66 for HelloWorld
Response #67 for HelloWorld
Response #68 for HelloWorld
Response #69 for HelloWorld
Response #70 for HelloWorld
Response #71 for HelloWorld
Response #72 for HelloWorld
Response #73 for HelloWorld
Response #74 for HelloWorld
Response #75 for HelloWorld
Response #76 for HelloWorld
Response #77 for HelloWorld
Response #78 for HelloWorld
Response #79 for HelloWorld
Response #80 for HelloWorld
Response #81 for HelloWorld
Response #82 for HelloWorld
Response #83 for HelloWorld
Response #84 for HelloWorld
Response #85 for HelloWorld
Response #86 for HelloWorld
Response #87 for HelloWorld
Response #88 for HelloWorld
Response #89 for HelloWorld
Response #90 for HelloWorld
Response #91 for HelloWorld
Response #92 for HelloWorld
Response #93 for HelloWorld
Response #94 for HelloWorld
Response #95 for HelloWorld
Response #96 for HelloWorld
Response #97 for HelloWorld
Response #98 for HelloWorld
Response #99 for HelloWorld
Response #100 for HelloWorld
Response #101 for HelloWorld
Response #102 for HelloWorld
Response #103 for HelloWorld
Response #104 for HelloWorld
Response #105 for HelloWorld
Response #106 for HelloWorld
Response #107 for HelloWorld
Response #108 for HelloWorld
Response #109 for HelloWorld
Response #110 for HelloWorld
Response #111 for HelloWorld
Response #112 for HelloWorld
Response #113 for HelloWorld
Response #114 for HelloWorld
Response #115 for HelloWorld
Response #116 for HelloWorld
Response #117 for HelloWorld
Response #118 for HelloWorld
Response #119 for HelloWorld
Response #120 for HelloWorld
Response #121 for HelloWorld
Response #122 for HelloWorld
Response #123 for HelloWorld
Response #124 for HelloWorld
Response #125 for HelloWorld
Response #126 for HelloWorld
Response #127 for HelloWorld
Response #128 for HelloWorld
Response #129 for HelloWorld
Response #130 for HelloWorld
Response #131 for HelloWorld
Response #132 for HelloWorld
Response #133 for HelloWorld
Response #134 for HelloWorld
Response #135 for HelloWorld
Response #136 for HelloWorld
Response #137 for HelloWorld
Response #138 for HelloWorld
Response #139 for HelloWorld
Response #140 for HelloWorld
Response #141 for HelloWorld
Response #142 for HelloWorld
Response #143 for HelloWorld
Response #144 for HelloWorld
Response #145 for HelloWorld
Response #146 for HelloWorld
Response #147 for HelloWorld
Response #148 for HelloWorld
Response #149 for HelloWorld
Response #150 for HelloWorld
Response #151 for HelloWorld
Response #152 for HelloWorld
Response #153 for HelloWorld
Response #154 for HelloWorld
Response #155 for HelloWorld
Response #156 for HelloWorld
Response #157 for HelloWorld
Response #158 for HelloWorld
Response #159 for HelloWorld
Response #160 for HelloWorld
Response #161 for HelloWorld
Response #162 for HelloWorld
Response #163 for HelloWorld
Response #164 for HelloWorld
Response #165 for HelloWorld
Response #166 for HelloWorld
Response #167 for HelloWorld
Response #168 for HelloWorld
Response #169 for HelloWorld
Response #170 for HelloWorld
Response #171 for HelloWorld
Response #172 for HelloWorld
Response #173 for HelloWorld
Response #174 for HelloWorld
Response #175 for HelloWorld
Response #176 for HelloWorld
Response #177 for HelloWorld
Response #178 for HelloWorld
Response #179 for HelloWorld
Response #180 for HelloWorld
Response #181 for HelloWorld
Response #182 for HelloWorld
Response #183 for HelloWorld
Response #184 for HelloWorld
Response #185 for HelloWorld
Response #186 for HelloWorld
Response #187 for HelloWorld
Response #188 for HelloWorld
Response #189 for HelloWorld
Response #190 for HelloWorld
Response #191 for HelloWorld
Response #192 for HelloWorld
Response #193 for HelloWorld
Response #194 for HelloWorld
Response #195 for HelloWorld
Response #196 for HelloWorld
Response #197 for HelloWorld
Response #198 for HelloWorld
Response #199 for HelloWorld
write_loop with write_msgs_.size() = 0