Boost asio UDP客户端不接收数据

Boost asio UDP client not receiving data

我正在尝试使用 UDP 从服务器向特定客户端发送数据,而不是让客户端先向服务器发送数据,根本不发送服务器数据。问题是客户端使用等待数据 receive_from() 函数,什么也得不到。由于某种原因,服务器确实设法发送数据,并且在发送了整个有效负载后,它自行关闭,但我不知道它将数据发送到哪里。 如果我运行没有客户端的服务器数据也发送成功,我不知道为什么,服务器不是应该阻止send_to()函数直到数据发送吗?

这是客户端停止的地方(我的部分代码):

void UDPclient::run()
{
    std::string relative_path = "assets/";
    std::thread thread_context = std::thread([&] {_io_context.run(); }); //start the context.
    file_info fileInfo;

    boost::asio::io_context::work idleWork(_io_context);
    boost::system::error_code error;

    udp::socket socket(_io_context); //the file descriptor 
    
    WirehairCodec decoder;

    udp::endpoint sender;
    memset(&fileInfo, sizeof(fileInfo), 0); //reset the struct to 0 (good practice)

    std::size_t bytes_transferred = socket.receive_from(
        boost::asio::buffer(&fileInfo, sizeof(fileInfo)),
        sender);

    ...

    socket.close();
}

服务器(我的部分代码):

int main()
{
    std::uint16_t port = 2000;
    file_info fileInfo;
    std::string filePath;
    boost::asio::io_context io_context;

    udp::socket socket(io_context, udp::endpoint(udp::v4(), port));
    udp::endpoint destination(boost::asio::ip::address::from_string("127.0.0.1"), port);
    
    boost::system::error_code ec;
    const WirehairResult initResult = wirehair_init(); //initialize wirehair

    if(initResult != Wirehair_Success)
    {
        std::cout << "failed to initialize wirehair: " << initResult << std::endl;
        return -1;
    }

    if (ec)
        std::cerr << ec.message() << std::endl;
    else
    { 
        std::cout << "Enter the specified file (Full path) to send: ";
        std::cin >> filePath; 

        while (!boost::filesystem::exists(filePath))
        {
            std::cerr << "file doesn't exist." << std::endl;
            std::cout << "Enter the specified file (Full path) to send: ";
            std::cin >> filePath;
        }

        read_fileToVector(filePath, vBuffer);
        file_info fileInfo;
        memset(&fileInfo, 0, sizeof(fileInfo)); // Always set the struct values to 0, good practice.

        //send file size, name
        fileInfo.size = vBuffer.size();
        strncpy(fileInfo.fileName, filePath.substr(filePath.find_last_of("/\") + 1).c_str(), 
                  sizeof(fileInfo.fileName) - 1);
        std::cout << "name: " << fileInfo.fileName << std::endl;
        std::cout << "size: " << fileInfo.size << std::endl;

        socket.send_to(boost::asio::buffer(&fileInfo, sizeof(fileInfo)), destination);
        socket.wait(socket.wait_write);

        ...
    socket.close();
}

没有足够的代码来说明,但我可以指出一些味道:

  • memset 调用错误。既然显然 file_info 必须是简单的标准布局(对于 memset 是合法的),为什么不 aggregate-initialize 它用空初始化器代替,效果相同(default-initializing 每个成员)?

  • 您的上下文显然是一个 class 成员,但您要为每个接收启动一个线程?这似乎很奇怪。我希望单线程的生命周期为 io_context/work.

    如果你真的想要 reuse/restart 相同 io_context,请记住需要在其间调用 .reset()

  • 你也不需要open/close每个接收一个套接字

  • 您可能希望将 UDP 套接字绑定到特定端口以在

    上接收

我希望看到的内容类似于:

struct UDPclient {
    UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
        socket_.bind({{}, port_});
    }

    bool run()
    {
        file_info fi{}; // value-initializes all members

        udp::endpoint sender;
        if (std::size_t n =
                socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
            n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
        {
            // don't assume name will be zero terminated
            std::string_view name(fi.name.data(),
                                  strnlen(fi.name.data(), fi.name.max_size()));

            COUT << "Receiving " << fi.xfer_id << " length " << fi.file_length
                 << " name " << std::quoted(name) << " from " << sender
                 << std::endl;
            decoder_.reset(
                wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));

            if (!decoder_) {
                throw std::runtime_error("wirehair_decoder_create");
            }

            packet_info packet{};

            for (bool data_complete = false; !data_complete;) {
                if (auto len = socket_.receive_from(
                        asio::buffer(&packet, sizeof(packet)), sender);
                    n >= packet_info::HEADERLEN &&
                    packet.magic == packet_info::MAGIC &&
                    len == packet_info::HEADERLEN + packet.block_length) //
                {
                    if (fi.xfer_id != packet.xfer_id)
                        continue; // TODO concurrent receives

                    COUT << "(Incoming " << packet.block_length << " for "
                         << fi.xfer_id << " from " << sender << ")"
                         << std::endl;
                    // Attempt decode
                    switch (wirehair_decode(decoder_.get(), packet.id,
                                            packet.block.data(),
                                            packet.block_length)) //
                    {
                        case Wirehair_NeedMore: continue; break;
                        case Wirehair_Success: data_complete = true; break;
                        default: throw std::runtime_error("wirehair_decode");
                    }
                    COUT << "(data complete? " << std::boolalpha
                         << data_complete << ")" << std::endl;
                }
            }
            COUT << "Receive completed for " << fi.xfer_id << std::endl;

            // try to be safe about interpreting the output name
            auto spec = fs::relative(
                weakly_canonical(
                    relative_path /
                    fs::path(name).lexically_normal().relative_path()),
                relative_path);

            if (spec.empty())
                throw std::runtime_error("invalid file specification " + spec.native());

            auto target = relative_path / spec;
            fs::create_directories(target.parent_path());

            COUT << "Decoding to " << target << " for " << fi.xfer_id
                 << std::endl;
            std::vector<uint8_t> decoded(fi.file_length);

            // Recover original data on decoder side
            auto r = wirehair_recover(decoder_.get(), decoded.data(),
                    decoded.size());

            if (r != Wirehair_Success)
                throw std::runtime_error("wirehair_recover");

            std::ofstream(target, std::ios::binary)
                .write(reinterpret_cast<char const*>(decoded.data()),
                       decoded.size());
        }
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    decoder_{};
    fs::path    relative_path = "assets/";
};

为此,我制定了一个协议,由两种类型的消息组成,魔术头定义为:

namespace /*protocol*/ {
#pragma pack(push, 1)
    struct file_info {
        boost::endian::big_uint32_t magic, file_length;
        boost::uuids::uuid             xfer_id;
        std::array<char, PATH_MAX + 1> name;

        enum : unsigned {
            MAGIC     = 0xDEFACED,
            HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
        };
    };
    static_assert(std::is_trivial_v<file_info>);
    static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp

    struct packet_info {
        boost::endian::big_uint32_t      magic, block_length, id;
        boost::uuids::uuid               xfer_id;
        std::array<uint8_t, PACKET_SIZE> block;
        enum : unsigned {
            MAGIC     = static_cast<unsigned>(~0xDEFACED),
            HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
        };
    };
    static_assert(std::is_trivial_v<packet_info>);
    static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)

    // Rule Of Zero, please:
    struct WHFree {
        void operator()(WirehairCodec c) const { wirehair_free(c); }
    };

    using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace

另请注意 exception-safe 句柄类型 CodecPtr 可避免泄漏这些资源的任何风险。

使用 Wirehair 的完整演示

出于兴趣,我查看了 Wirehair 编解码器并使用它实现了一个简单的同步文件传输。

  • 一个限制是无法传输 zero-length 个文件(在这种情况下 wirehair_encoder_create 失败)。您必须创建一个例外来涵盖此类情况。
  • 也没有正常关闭(这是因为无法取消阻塞套接字操作)
  • 使接收端异步将立即解锁同时接收多个传输的可能性。我已经将 xfer_id 字段作为相关 ID 放入协议消息中。
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#include <wirehair/wirehair.h>

namespace asio = boost::asio;
namespace fs   = std::filesystem;
using asio::ip::udp;

static constexpr int      PACKET_SIZE  = 1400;
static constexpr uint16_t DEFAULT_PORT = 9797;

namespace /*protocol*/ {
#pragma pack(push, 1)
    struct file_info {
        boost::endian::big_uint32_t magic, file_length;
        boost::uuids::uuid             xfer_id;
        std::array<char, PATH_MAX + 1> name;

        enum : unsigned {
            MAGIC     = 0xDEFACED,
            HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
        };
    };
    static_assert(std::is_trivial_v<file_info>);
    static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp

    struct packet_info {
        boost::endian::big_uint32_t      magic, block_length, id;
        boost::uuids::uuid               xfer_id;
        std::array<uint8_t, PACKET_SIZE> block;
        enum : unsigned {
            MAGIC     = static_cast<unsigned>(~0xDEFACED),
            HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
        };
    };
    static_assert(std::is_trivial_v<packet_info>);
    static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)

    // Rule Of Zero, please:
    struct WHFree {
        void operator()(WirehairCodec c) const { wirehair_free(c); }
    };

    using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace

struct UDPclient {
    UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
        socket_.bind({{}, port_});
    }

    bool run()
    {
        file_info fi{}; // value-initializes all members

        udp::endpoint sender;
        if (std::size_t n =
                socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
            n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
        {
            // don't assume name will be zero terminated
            std::string_view name(fi.name.data(),
                                  strnlen(fi.name.data(), fi.name.max_size()));

            std::cout << "Receiving " << fi.xfer_id << " length "
                      << fi.file_length << " name " << std::quoted(name)
                      << " from " << sender << std::endl;
            decoder_.reset(
                wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));

            if (!decoder_) {
                throw std::runtime_error("wirehair_decoder_create");
            }

            packet_info packet{};

            for (bool data_complete = false; !data_complete;) {
                if (auto len = socket_.receive_from(
                        asio::buffer(&packet, sizeof(packet)), sender);
                    n >= packet_info::HEADERLEN &&
                    packet.magic == packet_info::MAGIC &&
                    len == packet_info::HEADERLEN + packet.block_length) //
                {
                    if (fi.xfer_id != packet.xfer_id)
                        continue; // TODO concurrent receives

                    std::cout << "(Incoming " << packet.block_length << " for "
                              << fi.xfer_id << " from " << sender << ")"
                              << std::endl;
                    // Attempt decode
                    switch (wirehair_decode(decoder_.get(), packet.id,
                                            packet.block.data(),
                                            packet.block_length)) //
                    {
                        case Wirehair_NeedMore: continue; break;
                        case Wirehair_Success: data_complete = true; break;
                        default: throw std::runtime_error("wirehair_decode");
                    }
                    std::cout << "(data complete? " << std::boolalpha
                              << data_complete << ")" << std::endl;
                }
            }
            std::cout << "Receive completed for " << fi.xfer_id << std::endl;

            // try to be safe about interpreting the output name
            auto spec = fs::relative(
                weakly_canonical(
                    relative_path /
                    fs::path(name).lexically_normal().relative_path()),
                relative_path);

            if (spec.empty())
                throw std::runtime_error("invalid file specification " + spec.native());

            auto target = relative_path / spec;
            fs::create_directories(target.parent_path());

            std::cout << "Decoding to " << target << " for " << fi.xfer_id
                      << std::endl;
            std::vector<uint8_t> decoded(fi.file_length);

            // Recover original data on decoder side
            auto r = wirehair_recover(decoder_.get(), decoded.data(),
                    decoded.size());

            if (r != Wirehair_Success)
                throw std::runtime_error("wirehair_recover");

            std::ofstream(target, std::ios::binary)
                .write(reinterpret_cast<char const*>(decoded.data()),
                       decoded.size());
        }
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    decoder_{};
    fs::path    relative_path = "assets/";
};

struct Sender {
    Sender(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
        : socket_(ex, udp::v4())
        , port_(port)
    {
    }

    bool send(fs::path filespec)
    {
        std::ifstream ifs(filespec, std::ios::binary);
        std::vector<uint8_t> const contents(std::istreambuf_iterator<char>(ifs),
                                            {});
        ifs.close();
        assert(contents.size() == fs::file_size(filespec));

        file_info fi{}; // value-initializes all members
        fi.magic       = file_info::MAGIC;
        fi.xfer_id     = boost::uuids::random_generator{}();
        fi.file_length = contents.size();
        strncpy(fi.name.data(), filespec.c_str(), fi.name.size() - 1);

        socket_.send_to(asio::buffer(&fi, sizeof(fi)), {{}, port_});

        // Create encoder
        encoder_.reset(wirehair_encoder_create(nullptr, contents.data(),
                                               contents.size(), PACKET_SIZE));
        if (!encoder_) {
            throw std::runtime_error("wirehair_encoder_create");
        }

        auto N = contents.size() / PACKET_SIZE + 1;
        N      = (N * 10) / 9; // ~10% redundancy

        std::cout << "Sending " << filespec << " of " << contents.size()
                  << " bytes in " << N << " packets of " << PACKET_SIZE
                  << std::endl;

        for (unsigned block_id = 1; block_id <= N; ++block_id) {
            sleep_for(500ms);
            packet_info packet{};
            packet.magic   = packet_info::MAGIC;
            packet.xfer_id = fi.xfer_id;

            // Encode a packet
            uint32_t writeLen = 0;
            if (auto r = wirehair_encode(encoder_.get(), block_id,
                                         packet.block.data(),
                                         packet.block.size(), &writeLen);
                r == Wirehair_Success) //
            {
                packet.id           = block_id;
                packet.block_length = writeLen;
                socket_.send_to(
                    asio::buffer(&packet, packet_info::HEADERLEN + writeLen),
                    {{}, port_});
                std::cout << "(Packet " << packet.block_length << " bytes)"
                          << std::endl;
            } else {
                throw std::runtime_error("wirehair_encode");
            }
        }

        std::cout << "Send " << filespec << " complete (xfer_id:" << fi.xfer_id
                  << ")" << std::endl;
        return true;
    }

  private:
    udp::socket socket_;
    uint16_t    port_;
    CodecPtr    encoder_{};
    fs::path    relative_path = "assets/";
};

int main(int argc, char** argv) {
    if (auto r = wirehair_init(); r != Wirehair_Success) {
        std::cout << "Wirehair initialization failed: " << r << std::endl;
        return 1;
    }

    asio::thread_pool io(1); 
    auto ex = io.get_executor();

    post(io, [ex] {
        UDPclient client{ex};
        while (true)
        try { client.run(); }
        catch (std::exception const& e) { std::cout << "Exception: " << e.what() << "\n"; }
    });

    Sender sender{ex};
    for (auto spec : std::vector(argv + 1, argv + argc)) {
        try {
            sender.send(spec);
        } catch (std::exception const& e) {
            std::cout << "Exception: " << e.what() << "\n";
        }
    }

    io.join();
}

我无法在线演示,但您可以使用位于 https://github.com/sehe/wirehair-demo:

的存储库自行构建它