如何从多个线程安全地写入套接字?
How to safely write to a socket from multiple threads?
我正在使用 asio(非增强)创建一个 TCP 服务器,虽然我的代码可以工作,但它没有正确完成,因为我从多个线程调用 asio::async_write
。我 认为 我应该使用 strands 但我读得越多,我就越迷茫。
#include <cstdlib>
#include <iostream>
#include <utility>
#include <thread>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include "messages.h"
using asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(data_, max_length),
[this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
buffer_.append(data_, length);
size_t pos;
while ((pos = buffer_.find('[=10=]')) != std::string::npos)
{
std::string message = buffer_.substr(0, pos);
buffer_.erase(0, pos + 1);
std::thread(&session::process_message, this, message).detach();
}
do_read();
}
else if (ec != asio::error::eof)
{
std::cerr << "Read error: " << ec.message() << '\n';
}
});
}
void do_write(std::string message)
{
auto self(shared_from_this());
asio::async_write(socket_, asio::buffer(message),
[this, self](std::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
}
else if (ec != asio::error::eof)
{
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
void process_message(std::string message) {
std::string response = handler.processMessage(message);
do_write(response);
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
std::string buffer_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](std::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit()
{
try
{
asio::io_context io_context;
server s(io_context, 0);
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << '\n';
}
}
您只有 1 个线程 运行正在运行 IO 服务。一切都在隐式链上 (Why do I need strand per connection when using boost::asio?),在您开始使用新线程之前无需担心。
那么,最简单的修复似乎可以确保在 IO 服务上也发送回复:
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
现在,如果您希望能够 运行 多线程上的 IO 服务,您只需确保套接字使用 strand 执行程序。
然而
这不能保证您不会看到重叠的 async_write 操作,因为传入消息的处理速度可能比它们发送的速度快。因此习惯的解决方法是
正在排队
在我的示例中,我通常将此 FIFO 队列称为“outbox_”,出于 iterator/reference 稳定性的原因,我更喜欢使用 deque
(参见 Iterator invalidation rules for C++ containers):
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
演示
这是一个带有存根的固定列表 message.h。
它还极大地通过使用现有的async_read_until
组合操作简化了reading/buffer处理,它可以完成您手动编写的所有操作。
#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
#include "messages.h"
#else // mock messages.h
#include <boost/lexical_cast.hpp>
#include <iomanip>
struct MessageHandler {
std::string initialMessage() const { return "Initial\n"; }
std::string processMessage(std::string const& req) const {
return "Processed " +
boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
}
};
#endif
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(buffer_), '[=12=]',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
buffer_.erase(0, length);
do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
// dispatch/post to executor because we are on a different thread
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
tcp::socket socket_;
std::string buffer_;
std::deque<std::string> outbox_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit() {
try {
asio::io_context io_context;
server s(io_context, 8989);
io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
int main() { serverInit(); }
发送最后一批请求时:
printf 'Message%d[=13=]' {1..100} | nc 127.0.0.1 8989 -w1
正确打印例如:
Accepted 127.0.0.1:34862
客户收到例如:
Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"
奖励:添加链
最小的变化:
class server
{
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(s))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
void serverInit() {
try {
asio::thread_pool io_context;
server s(io_context.get_executor(), 8989);
io_context.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
现场演示:
我正在使用 asio(非增强)创建一个 TCP 服务器,虽然我的代码可以工作,但它没有正确完成,因为我从多个线程调用 asio::async_write
。我 认为 我应该使用 strands 但我读得越多,我就越迷茫。
#include <cstdlib>
#include <iostream>
#include <utility>
#include <thread>
#include <asio/ts/buffer.hpp>
#include <asio/ts/internet.hpp>
#include "messages.h"
using asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(data_, max_length),
[this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
buffer_.append(data_, length);
size_t pos;
while ((pos = buffer_.find('[=10=]')) != std::string::npos)
{
std::string message = buffer_.substr(0, pos);
buffer_.erase(0, pos + 1);
std::thread(&session::process_message, this, message).detach();
}
do_read();
}
else if (ec != asio::error::eof)
{
std::cerr << "Read error: " << ec.message() << '\n';
}
});
}
void do_write(std::string message)
{
auto self(shared_from_this());
asio::async_write(socket_, asio::buffer(message),
[this, self](std::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
}
else if (ec != asio::error::eof)
{
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
void process_message(std::string message) {
std::string response = handler.processMessage(message);
do_write(response);
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
std::string buffer_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](std::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit()
{
try
{
asio::io_context io_context;
server s(io_context, 0);
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << '\n';
}
}
您只有 1 个线程 运行正在运行 IO 服务。一切都在隐式链上 (Why do I need strand per connection when using boost::asio?),在您开始使用新线程之前无需担心。
那么,最简单的修复似乎可以确保在 IO 服务上也发送回复:
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
现在,如果您希望能够 运行 多线程上的 IO 服务,您只需确保套接字使用 strand 执行程序。
然而
这不能保证您不会看到重叠的 async_write 操作,因为传入消息的处理速度可能比它们发送的速度快。因此习惯的解决方法是
正在排队
在我的示例中,我通常将此 FIFO 队列称为“outbox_”,出于 iterator/reference 稳定性的原因,我更喜欢使用 deque
(参见 Iterator invalidation rules for C++ containers):
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << '\n';
}
});
}
演示
这是一个带有存根的固定列表 message.h。
它还极大地通过使用现有的async_read_until
组合操作简化了reading/buffer处理,它可以完成您手动编写的所有操作。
#include <boost/asio.hpp>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <utility>
#if 0
#include "messages.h"
#else // mock messages.h
#include <boost/lexical_cast.hpp>
#include <iomanip>
struct MessageHandler {
std::string initialMessage() const { return "Initial\n"; }
std::string processMessage(std::string const& req) const {
return "Processed " +
boost::lexical_cast<std::string>(std::quoted(req)) + "\n";
}
};
#endif
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) {}
void start() {
handler = MessageHandler();
asio::write(socket_, asio::buffer(handler.initialMessage()));
do_read();
}
private:
void do_read() {
async_read_until(
socket_, asio::dynamic_buffer(buffer_), '[=12=]',
[this, self = shared_from_this()] //
(error_code ec, std::size_t length) {
if (!ec) {
std::thread(&session::process_message, this, buffer_.substr(0, length - 1)).detach();
buffer_.erase(0, length);
do_read();
} else if (ec != asio::error::eof) {
std::cerr << "Read error: " << ec.message() << std::endl;
}
});
}
void do_write(std::string message)
{
outbox_.push_back(std::move(message)); // assumed on (implicit) strand
if (outbox_.size() == 1) {
write_loop();
}
}
void write_loop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()] //
(error_code ec, std::size_t /*length*/) {
if (!ec) {
outbox_.pop_front();
write_loop();
} else if (ec != asio::error::eof) {
std::cerr << "Write error: " << ec.message() << std::endl;
}
});
}
void process_message(std::string const& message) {
std::string response = handler.processMessage(message);
// dispatch/post to executor because we are on a different thread
post(socket_.get_executor(),
std::bind(&session::do_write, shared_from_this(), response));
}
tcp::socket socket_;
std::string buffer_;
std::deque<std::string> outbox_;
MessageHandler handler;
};
class server
{
public:
server(asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
socket_(io_context)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::cout << "Accepted " << socket_.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
void serverInit() {
try {
asio::io_context io_context;
server s(io_context, 8989);
io_context.run();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
int main() { serverInit(); }
发送最后一批请求时:
printf 'Message%d[=13=]' {1..100} | nc 127.0.0.1 8989 -w1
正确打印例如:
Accepted 127.0.0.1:34862
客户收到例如:
Initial
Processed "Message2"
Processed "Message1"
Processed "Message4"
Processed "Message3"
Processed "Message5"
Processed "Message6"
Processed "Message7"
Processed "Message8"
Processed "Message9"
Processed "Message10"
Processed "Message11"
Processed "Message12"
Processed "Message13"
Processed "Message15"
Processed "Message16"
Processed "Message14"
Processed "Message18"
Processed "Message19"
Processed "Message20"
Processed "Message21"
Processed "Message22"
Processed "Message23"
Processed "Message24"
Processed "Message25"
Processed "Message26"
Processed "Message27"
Processed "Message28"
Processed "Message29"
Processed "Message30"
Processed "Message31"
Processed "Message32"
Processed "Message33"
Processed "Message34"
Processed "Message35"
Processed "Message17"
Processed "Message36"
Processed "Message38"
Processed "Message39"
Processed "Message40"
Processed "Message41"
Processed "Message42"
Processed "Message43"
Processed "Message44"
Processed "Message45"
Processed "Message46"
Processed "Message47"
Processed "Message48"
Processed "Message49"
Processed "Message50"
Processed "Message51"
Processed "Message52"
Processed "Message53"
Processed "Message54"
Processed "Message55"
Processed "Message56"
Processed "Message57"
Processed "Message58"
Processed "Message59"
Processed "Message60"
Processed "Message61"
Processed "Message62"
Processed "Message63"
Processed "Message64"
Processed "Message65"
Processed "Message66"
Processed "Message67"
Processed "Message68"
Processed "Message69"
Processed "Message70"
Processed "Message71"
Processed "Message72"
Processed "Message73"
Processed "Message74"
Processed "Message75"
Processed "Message76"
Processed "Message77"
Processed "Message78"
Processed "Message79"
Processed "Message80"
Processed "Message81"
Processed "Message82"
Processed "Message83"
Processed "Message84"
Processed "Message85"
Processed "Message86"
Processed "Message87"
Processed "Message88"
Processed "Message89"
Processed "Message90"
Processed "Message91"
Processed "Message92"
Processed "Message93"
Processed "Message94"
Processed "Message95"
Processed "Message96"
Processed "Message97"
Processed "Message98"
Processed "Message99"
Processed "Message100"
Processed "Message37"
奖励:添加链
最小的变化:
class server
{
public:
server(asio::any_io_executor ex, unsigned short port)
: acceptor_(ex, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(make_strand(acceptor_.get_executor()), [this](error_code ec, tcp::socket&& s) {
if (!ec) {
std::cout << "Accepted " << s.remote_endpoint() << std::endl;
std::make_shared<session>(std::move(s))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
void serverInit() {
try {
asio::thread_pool io_context;
server s(io_context.get_executor(), 8989);
io_context.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
}
现场演示: