如何在 Boost Asio 中使用 Scatter/Gather IO 发送包含向量的结构?

How to use Scatter/Gather IO in Boost Asio for sending a struct containing a vector?

我正在尝试使用 Boost Asio 构建一个简单的 IPC 协议,其中服务器端将向客户端发送一个包含 vector<uint8_t> 的结构。有人建议我使用 scatter/gather IO 方法,但我无法让它工作,因为客户端似乎只接收到它期望的部分数据,并且它一直在无限期地等待其余数据到达,即使它应该已经在那里。 这就是我现在拥有的:

// File: client.cpp

#include <iostream>
#include <vector>
#include <boost/asio.hpp>

#include "ipc_common.hpp"

namespace ba = boost::asio;
using boost::asio::ip::tcp;

int main(int argc, char *argv[])
{
    ba::io_context io;

    std::vector<std::string> args(argv, argv + argc);

    switch (args.size()) {
        case 1:
            args = {args.at(0), "localhost", "6869"};
            break;
        case 2:
            args = {args.at(0), args.at(1), "6869"};
            break;
        case 3:
            args = {args.at(0), args.at(1), args.at(2)};
            break;
        default:
            std::clog << "usage: " << args.at(0) << " [host = localhost] [port = 6869]" << std::endl;
            return 1;
    }

    try {
        propertiesPacket properties;
        properties.val1 = 9;
        properties.val2 = 45;

        tcp::socket socket(io);
        tcp::resolver resolver(io);

        connect(socket, resolver.resolve(args.at(1), args.at(2)));
        write(socket, ba::buffer(&properties, sizeof(properties)));

        uint16_t responseSize {};
        ba::read(socket, ba::buffer(&responseSize, sizeof(uint16_t)));

        std::clog << "client responseSize: " << responseSize << std::endl;
        processedData response {};

        std::vector<ba::mutable_buffer> responseBuffers {
            ba::buffer(&response.size, sizeof(uint16_t)),
            ba::buffer(&response.values, responseSize - sizeof(uint8_t))
        };
        ba::read(socket, responseBuffers);

        std::clog << response.serialize();

        return 0;
    } catch (std::exception &e) {
        std::clog << e.what() << std::endl;
        return 1;
    }
}
// File: server.cpp

#include <vector>
#include <boost/asio.hpp>

#include "ipc_common.hpp"

namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using TCPSocket = tcp::socket;

class ServerConnection
    : public std::enable_shared_from_this<ServerConnection>
{
    public:
        ServerConnection(TCPSocket socket)
            : socket_(std::move(socket))
        { }


        void start()
        {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            doRead();
        }


    private:
        void doRead()
        {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            auto self(shared_from_this());
            socket_.async_read_some(ba::buffer(&properties_, sizeof(properties_)),
                                    [this, self](error_code ec, std::size_t length)
                                    {
                                        std::clog << "received " << length << std::endl;
                                        if (!ec) {
                                            processData();
        
                                            std::vector<ba::const_buffer> msg {
                                                ba::buffer(&filePacketSize_, sizeof(uint16_t)),
                                                ba::buffer(&filePacket_.val, sizeof(filePacket_.val)),
                                                ba::buffer(&filePacket_.values, sizeof(filePacket_.values))};
                                            std::clog << "filePacketSize_: " << filePacketSize_ << std::endl;

                                            ba::async_write(socket_, msg,
                                                            [this, self = shared_from_this()](error_code ec, std::size_t length)
                                                            {
                                                                std::clog << "written " << length << std::endl;
                                                                if (!ec) doRead();
                                                            });
                                        }
                                    });
        }


        void processData()
        {
            filePacket_.val = properties_.val1;

            // Just for demonstration, we fill the vector with random values
            std::random_device rd;
            std::mt19937 re(rd()) ;
            std::uniform_int_distribution<uint8_t> dist(0, 255);
            for (size_t i {}; i < filePacket_.val; ++i) {
                processedData.values.push_back(dist(re));
            }
        }

        TCPSocket socket_;

        propertiesPacket properties_;
        processedData filePacket_;
        uint16_t filePacketSize_;
};

class Server
{
    public:
        using IOContext = ba::io_context;
        using TCPAcceptor = tcp::acceptor;

        Server(IOContext& io, uint16_t port)
            : socket_(io),
              acceptor_(io, {tcp::v4(), port})
        {
            doAccept();
        }

    private:
        void doAccept()
        {
            std::clog << __PRETTY_FUNCTION__ << std::endl;
            acceptor_.async_accept(socket_,
                [this](error_code ec)
                {
                    if (!ec) {
                        std::clog << "Accepted " << socket_.remote_endpoint() << std::endl;
                        std::make_shared<ServerConnection>(std::move(socket_))->start();
                        doAccept();
                    }
                    else {
                        std::clog << "Accept " << ec.message() << std::endl;
                    }
                });
        }

        TCPSocket socket_;
        TCPAcceptor acceptor_;
};

int main(int argc, char* argv[])
{
    std::vector<std::string> args(argv, argv + argc);

    switch (args.size()) {
        case 1:
            args = {args.at(0), "6869"};
            break;
        case 2:
            args = {args.at(0), args.at(1)};
            break;
        default:
            std::clog << "usage: " << args.at(0) << " [port = 6869]" << std::endl;
            return 1;
    }
    
    try {
        ba::io_context io;
        Server server(io, std::stoi(args.at(1)));
        
        io.run();
    } catch (std::exception &e) {
        std::clog << e.what() << std::endl;
        return 1;
    }
    
    return 0;
}

// File: ipc_common.hpp

#include <cstdint>
#include <vector>
#include <sstream>
#include <string>

struct propertiesPacket
{
    uint8_t val1;
    uint8_t val2;
};

struct processedData
{
    uint8_t val;
    std::vector<uint8_t> values;

    std::string serialize()
    {
        std::stringstream sstream;
        sstream << "val: " << (unsigned int)val << std::endl;
        for (const auto &i : values)
        {
            sstream << i << " ";
        }
        
        sstream << std::endl;
        return sstream.str();
    }
};

我做错了什么?

示例似乎已损坏。

  1. 其中,args.at(3)args.at(4) 根据定义总是抛出异常,因为根据定义,前面的 switch 语句总是会在超过 2 个命令行时退出客户端参数 (default:).

  2. 其次,客户端read使用&response.size,但根本不存在这样的成员。

  3. 第三,服务器 processData 使用 procesedData.val 属性 甚至不是成员(它是一种类型,可能应该改为 filePacket_.val)。

  4. 第四,它从 properties_.val; 分配根本不存在的那个(只有 val1val2)。

  5. 接下来,rd不用于初始化URBG(随机引擎,re)。相反,它会调用一个名为 random_device() 的未知标识符。可能应该改为 rd()

  6. 再说一遍,processedData.val 你的意思可能是 filePacket_.val

  7. 你写 processedData.push_back(...) 的地方可能是想说 filePacket_.values.push_back(...)...

  8. 服务器void doAccept()后面有一个虚假的;

  9. 相比之下,ipc_common.hpp

    中每个结构定义后都缺少;
  10. processedData 结构定义了一个从未使用过的 serialize() 方法。它还使用 C 风格的转换,其中 static_cast<unsigned>(val) 是安全的。

  11. 奇怪的是,服务器“解析”args,并提供了一个可选的默认值,但它从不使用它。相反,它使用 argv[1] 而根本不检查 argc。哎呀

抛开这些,现在是令人困惑的部分:您希望如何写入这些值?这是不正确的:

std::vector<ba::const_buffer> msg{
    ba::buffer(&filePacketSize_, sizeof(uint16_t)),
    ba::buffer(&filePacket_.val, sizeof(filePacket_.val)),
    ba::buffer(&filePacket_.values, sizeof(filePacket_.values))};

values 是一个 std::vector<> 所以你不能指望按位使用它。它只会调用 Undefined Behaviour.

此外,还不清楚为什么 filePacketSize_ 被写入(它甚至从未被分配,甚至从未被初始化为一个确定的值)。

在客户端你读到一个 responseSize好像一个会被发送...也许你想让这两个保持同步。


建议的方法

我会取消单独的大小值,因为向量已经跟踪那个。我还要确保您的 processData 并不总是 push_back 因为向量会一直保持增长。

我会制定一个协议,在消息本身之前实际发送消息大小,并确保它是正确的。

为了简单起见,我们也让随机数据自然可打印 (a..z):

void processData()
{
    // Just for demonstration, we fill the vector with random characters
    std::mt19937 re(std::random_device{}());
    std::uniform_int_distribution<uint8_t> dist('a', 'z');
    filePacket_.values.clear();
    std::generate_n(back_inserter(filePacket_.values), properties_.val1,
                    [&] { return dist(re); });
}

那么在写作中,我们做:

processData();

size_t length[] { filePacket_.values.size() };

std::vector<ba::const_buffer> msg{
    ba::buffer(length),
    ba::buffer(filePacket_.values)};

Note how, again, we avoid manually specifying any buffer sizes. Also, we let the library figure out that values is a vector of POD elements and do the math to convert the calculate the correct start address and buffer size for the element data.

在客户端,我们做相反的事情:

size_t length = 0;
ba::read(socket, ba::buffer(&length, sizeof(length)));

response.values.resize(length);
ba::read(socket, ba::buffer(response.values));

(这里我们不能避免写 sizeof(length) 而不会变得比我想要的更笨拙)。

完整演示

  • 文件ipc_common.hpp

     // File: ipc_common.hpp
     #include <cstdint>
     #include <sstream>
     #include <string>
     #include <vector>
    
     struct propertiesPacket {
         uint8_t val1;
         uint8_t val2;
     };
    
     struct processedData {
         std::vector<uint8_t> values;
     };
    
  • 文件server.cpp

     #include <boost/asio.hpp>
     #include <vector>
     #include <iostream>
     #include <random>
    
     #include "ipc_common.hpp"
    
     namespace ba = boost::asio;
     using boost::asio::ip::tcp;
     using boost::system::error_code;
     using TCPSocket = tcp::socket;
    
     class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
       public:
         ServerConnection(TCPSocket socket) : socket_(std::move(socket))
         {
         }
    
         void start()
         {
             std::clog << __PRETTY_FUNCTION__ << std::endl;
             doRead();
         }
    
       private:
         void doRead()
         {
             std::clog << __PRETTY_FUNCTION__ << std::endl;
             auto self(shared_from_this());
             socket_.async_read_some(
                 ba::buffer(&properties_, sizeof(properties_)),
                 [this, self](error_code ec, std::size_t length) {
                     std::clog << "received " << length << std::endl;
                     if (!ec) {
                         processData();
    
                         size_t length[] { filePacket_.values.size() };
    
                         std::vector<ba::const_buffer> msg{
                             ba::buffer(length), ba::buffer(filePacket_.values)};
    
                         ba::async_write(socket_, msg,
                                         [this, self = shared_from_this()](
                                             error_code ec, std::size_t length) {
                                             std::clog << "written " << length
                                                       << std::endl;
                                             if (!ec)
                                                 doRead();
                                         });
                     }
                 });
         }
    
             void processData()
             {
                 // Just for demonstration, we fill the vector with random characters
                 std::mt19937 re(std::random_device{}());
                 std::uniform_int_distribution<uint8_t> dist('a', 'z');
                 filePacket_.values.clear();
                 std::generate_n(back_inserter(filePacket_.values), properties_.val1,
                                 [&] { return dist(re); });
             }
    
         TCPSocket socket_;
    
         propertiesPacket properties_;
         processedData    filePacket_;
     };
    
     class Server {
       public:
         using IOContext   = ba::io_context;
         using TCPAcceptor = tcp::acceptor;
    
         Server(IOContext& io, uint16_t port)
             : socket_(io)
             , acceptor_(io, {tcp::v4(), port})
         {
             doAccept();
         }
    
       private:
         void doAccept()
         {
             std::clog << __PRETTY_FUNCTION__ << std::endl;
             acceptor_.async_accept(socket_, [this](error_code ec) {
                 if (!ec) {
                     std::clog << "Accepted " << socket_.remote_endpoint() << std::endl;
                     std::make_shared<ServerConnection>(std::move(socket_))->start();
                     doAccept();
                 } else {
                     std::clog << "Accept " << ec.message() << std::endl;
                 }
             });
         }
    
         TCPSocket   socket_;
         TCPAcceptor acceptor_;
     };
    
     int main(int argc, char* argv[])
     {
         std::vector<std::string> args(argv, argv + argc);
    
         switch (args.size()) {
             case 1: args.push_back("6869"); break;
             case 2: break;
             default:
                 std::clog << "usage: " << args.at(0) << " [port = 6869]" << std::endl;
                 return 1;
         }
    
         try {
             ba::io_context io;
             Server server(io, std::stoi(args.at(1)));
    
             io.run();
         } catch (std::exception const &e) {
             std::clog << e.what() << std::endl;
             return 1;
         }
     }
    
  • 文件client.cpp

     #include <iostream>
     #include <vector>
     #include <boost/asio.hpp>
    
     #include "ipc_common.hpp"
    
     namespace ba = boost::asio;
     using boost::asio::ip::tcp;
    
     int main(int argc, char *argv[])
     {
         ba::io_context io;
    
         std::vector<std::string> args(argv, argv + argc);
    
         switch (args.size()) {
             case 1: args.push_back("localhost"); [[fallthrough]];
             case 2: args.push_back("6869");      [[fallthrough]];
             case 3: args.push_back("42");        [[fallthrough]];
             case 4: args.push_back("99");        [[fallthrough]];
             case 5: break;
             default:
                 std::clog << "usage: " << args.at(0)
                           << " [host = localhost] [port = 6869] [val1=42] [val2=99]"
                           << std::endl;
                 return 1;
         }
    
         try {
             propertiesPacket properties;
             properties.val1 = std::stoul(args.at(3));
             properties.val2 = std::stoul(args.at(4));
    
             tcp::socket   socket(io);
             tcp::resolver resolver(io);
    
             connect(socket, resolver.resolve({args.at(1), args.at(2)}));
             write(socket, ba::buffer(&properties, sizeof(properties)));
    
             processedData response{};
             {
                 size_t length = 0;
                 ba::read(socket, ba::buffer(&length, sizeof(length)));
    
                 response.values.resize(length);
             }
    
             std::clog << "client response size: " << response.values.size() << std::endl;
             ba::read(socket, ba::buffer(response.values));
    
             std::clog.write(reinterpret_cast<char const*>(response.values.data()),
                             response.values.size()) << "\n";
             // return 0;
         } catch (std::exception &e) {
             std::clog << e.what() << std::endl;
             return 1;
         }
     }
    

演示输出:

便携性

您可能还应该牢记字节顺序。您可以考虑使用 JSON 或其他众所周知的序列化格式。