Boost.Asio:每个 connection/socket 使用 `io_service` 是好事吗?

Boost.Asio: Is it a good thing to use a `io_service` per connection/socket?

我想创建一个实现一个线程一个连接模型的应用程序。但是每个连接都必须是可停止的。我试过 this boost.asio example 实现了我想要的阻塞版本。但经过一番询问后,我发现没有可靠的方法来停止该示例的会话。所以我试图实现我自己的。我不得不使用异步函数。因为我想让一个线程只管理一个连接并且没有办法控制哪个异步作业被用于哪个线程,所以我决定对每个 connection/socket/thread.

使用 io_service

那么这个方法好吗,你知道更好的方法吗?

我的代码在这里,您可以检查和查看它:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '[=13=]',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '[=13=]';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '[=13=]');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}

没有。每个连接使用一个 io_service 对象绝对是一种味道。特别是因为您还运行在专用线程上建立每个连接。

此时你必须问问自己,异步给你带来了什么?您可以让所有代码同步并具有完全相同的线程数等。

很明显,您希望将连接多路复用到数量少得多的服务上。在实践中有一些合理的模型,例如

  1. 具有单个服务线程的单个 io_service(这通常很好)。在服务上排队的任何任务都不会阻塞很长时间,否则延迟会受到影响

  2. 单个 io_service 具有多个执行处理程序的线程。池中的线程数应该足以服务最大值。支持的同时 CPU 密集任务数(或者,延迟将开始上升)

  3. 每个线程一个 io_service,通常每个逻辑核心一个线程,并且具有线程关联性,因此它 "sticks" 到那个核心。这可能是缓存位置的理想选择

更新:演示

这是一个演示,使用上面的选项 1. 显示惯用风格:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            sock,
            stream_buffer,
            '[=10=]',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '[=10=]');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...\n";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...\n";
        s.Stop();

        std::cerr << "Shutdown...\n";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye\n";
}

我在没有用户干预的情况下将 main() 修改为 运行 2 秒。这样我就可以演示了Live On Coliru(当然,它限制了w.r.t客户端进程的数量)。

如果你运行它有很多(很多)的客户,使用例如

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 11235)& done; wait)

你会发现两秒钟 window 处理所有这些:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

如果你真的发狂并加注 1000 到例如100000 在那里,你会得到类似的东西:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

在服务器上重复 2 秒 运行s。