Boost asio UDP客户端不接收数据
Boost asio UDP client not receiving data
我正在尝试使用 UDP 从服务器向特定客户端发送数据,而不是让客户端先向服务器发送数据,根本不发送服务器数据。问题是客户端使用等待数据
receive_from()
函数,什么也得不到。由于某种原因,服务器确实设法发送数据,并且在发送了整个有效负载后,它自行关闭,但我不知道它将数据发送到哪里。
如果我运行没有客户端的服务器数据也发送成功,我不知道为什么,服务器不是应该阻止send_to()
函数直到数据发送吗?
这是客户端停止的地方(我的部分代码):
void UDPclient::run()
{
std::string relative_path = "assets/";
std::thread thread_context = std::thread([&] {_io_context.run(); }); //start the context.
file_info fileInfo;
boost::asio::io_context::work idleWork(_io_context);
boost::system::error_code error;
udp::socket socket(_io_context); //the file descriptor
WirehairCodec decoder;
udp::endpoint sender;
memset(&fileInfo, sizeof(fileInfo), 0); //reset the struct to 0 (good practice)
std::size_t bytes_transferred = socket.receive_from(
boost::asio::buffer(&fileInfo, sizeof(fileInfo)),
sender);
...
socket.close();
}
服务器(我的部分代码):
int main()
{
std::uint16_t port = 2000;
file_info fileInfo;
std::string filePath;
boost::asio::io_context io_context;
udp::socket socket(io_context, udp::endpoint(udp::v4(), port));
udp::endpoint destination(boost::asio::ip::address::from_string("127.0.0.1"), port);
boost::system::error_code ec;
const WirehairResult initResult = wirehair_init(); //initialize wirehair
if(initResult != Wirehair_Success)
{
std::cout << "failed to initialize wirehair: " << initResult << std::endl;
return -1;
}
if (ec)
std::cerr << ec.message() << std::endl;
else
{
std::cout << "Enter the specified file (Full path) to send: ";
std::cin >> filePath;
while (!boost::filesystem::exists(filePath))
{
std::cerr << "file doesn't exist." << std::endl;
std::cout << "Enter the specified file (Full path) to send: ";
std::cin >> filePath;
}
read_fileToVector(filePath, vBuffer);
file_info fileInfo;
memset(&fileInfo, 0, sizeof(fileInfo)); // Always set the struct values to 0, good practice.
//send file size, name
fileInfo.size = vBuffer.size();
strncpy(fileInfo.fileName, filePath.substr(filePath.find_last_of("/\") + 1).c_str(),
sizeof(fileInfo.fileName) - 1);
std::cout << "name: " << fileInfo.fileName << std::endl;
std::cout << "size: " << fileInfo.size << std::endl;
socket.send_to(boost::asio::buffer(&fileInfo, sizeof(fileInfo)), destination);
socket.wait(socket.wait_write);
...
socket.close();
}
没有足够的代码来说明,但我可以指出一些味道:
memset 调用错误。既然显然 file_info
必须是简单的标准布局(对于 memset 是合法的),为什么不 aggregate-initialize 它用空初始化器代替,效果相同(default-initializing 每个成员)?
您的上下文显然是一个 class 成员,但您要为每个接收启动一个线程?这似乎很奇怪。我希望单线程的生命周期为 io_context/work.
如果你真的想要 reuse/restart 相同 io_context,请记住需要在其间调用 .reset()
。
你也不需要open/close每个接收一个套接字
您可能希望将 UDP 套接字绑定到特定端口以在
上接收
我希望看到的内容类似于:
struct UDPclient {
UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
: socket_(ex, udp::v4())
, port_(port)
{
socket_.bind({{}, port_});
}
bool run()
{
file_info fi{}; // value-initializes all members
udp::endpoint sender;
if (std::size_t n =
socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
{
// don't assume name will be zero terminated
std::string_view name(fi.name.data(),
strnlen(fi.name.data(), fi.name.max_size()));
COUT << "Receiving " << fi.xfer_id << " length " << fi.file_length
<< " name " << std::quoted(name) << " from " << sender
<< std::endl;
decoder_.reset(
wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));
if (!decoder_) {
throw std::runtime_error("wirehair_decoder_create");
}
packet_info packet{};
for (bool data_complete = false; !data_complete;) {
if (auto len = socket_.receive_from(
asio::buffer(&packet, sizeof(packet)), sender);
n >= packet_info::HEADERLEN &&
packet.magic == packet_info::MAGIC &&
len == packet_info::HEADERLEN + packet.block_length) //
{
if (fi.xfer_id != packet.xfer_id)
continue; // TODO concurrent receives
COUT << "(Incoming " << packet.block_length << " for "
<< fi.xfer_id << " from " << sender << ")"
<< std::endl;
// Attempt decode
switch (wirehair_decode(decoder_.get(), packet.id,
packet.block.data(),
packet.block_length)) //
{
case Wirehair_NeedMore: continue; break;
case Wirehair_Success: data_complete = true; break;
default: throw std::runtime_error("wirehair_decode");
}
COUT << "(data complete? " << std::boolalpha
<< data_complete << ")" << std::endl;
}
}
COUT << "Receive completed for " << fi.xfer_id << std::endl;
// try to be safe about interpreting the output name
auto spec = fs::relative(
weakly_canonical(
relative_path /
fs::path(name).lexically_normal().relative_path()),
relative_path);
if (spec.empty())
throw std::runtime_error("invalid file specification " + spec.native());
auto target = relative_path / spec;
fs::create_directories(target.parent_path());
COUT << "Decoding to " << target << " for " << fi.xfer_id
<< std::endl;
std::vector<uint8_t> decoded(fi.file_length);
// Recover original data on decoder side
auto r = wirehair_recover(decoder_.get(), decoded.data(),
decoded.size());
if (r != Wirehair_Success)
throw std::runtime_error("wirehair_recover");
std::ofstream(target, std::ios::binary)
.write(reinterpret_cast<char const*>(decoded.data()),
decoded.size());
}
return true;
}
private:
udp::socket socket_;
uint16_t port_;
CodecPtr decoder_{};
fs::path relative_path = "assets/";
};
为此,我制定了一个协议,由两种类型的消息组成,魔术头定义为:
namespace /*protocol*/ {
#pragma pack(push, 1)
struct file_info {
boost::endian::big_uint32_t magic, file_length;
boost::uuids::uuid xfer_id;
std::array<char, PATH_MAX + 1> name;
enum : unsigned {
MAGIC = 0xDEFACED,
HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
};
};
static_assert(std::is_trivial_v<file_info>);
static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp
struct packet_info {
boost::endian::big_uint32_t magic, block_length, id;
boost::uuids::uuid xfer_id;
std::array<uint8_t, PACKET_SIZE> block;
enum : unsigned {
MAGIC = static_cast<unsigned>(~0xDEFACED),
HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
};
};
static_assert(std::is_trivial_v<packet_info>);
static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)
// Rule Of Zero, please:
struct WHFree {
void operator()(WirehairCodec c) const { wirehair_free(c); }
};
using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace
另请注意 exception-safe 句柄类型 CodecPtr
可避免泄漏这些资源的任何风险。
使用 Wirehair 的完整演示
出于兴趣,我查看了 Wirehair 编解码器并使用它实现了一个简单的同步文件传输。
- 一个限制是无法传输 zero-length 个文件(在这种情况下
wirehair_encoder_create
失败)。您必须创建一个例外来涵盖此类情况。
- 也没有正常关闭(这是因为无法取消阻塞套接字操作)
- 使接收端异步将立即解锁同时接收多个传输的可能性。我已经将
xfer_id
字段作为相关 ID 放入协议消息中。
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <wirehair/wirehair.h>
namespace asio = boost::asio;
namespace fs = std::filesystem;
using asio::ip::udp;
static constexpr int PACKET_SIZE = 1400;
static constexpr uint16_t DEFAULT_PORT = 9797;
namespace /*protocol*/ {
#pragma pack(push, 1)
struct file_info {
boost::endian::big_uint32_t magic, file_length;
boost::uuids::uuid xfer_id;
std::array<char, PATH_MAX + 1> name;
enum : unsigned {
MAGIC = 0xDEFACED,
HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
};
};
static_assert(std::is_trivial_v<file_info>);
static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp
struct packet_info {
boost::endian::big_uint32_t magic, block_length, id;
boost::uuids::uuid xfer_id;
std::array<uint8_t, PACKET_SIZE> block;
enum : unsigned {
MAGIC = static_cast<unsigned>(~0xDEFACED),
HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
};
};
static_assert(std::is_trivial_v<packet_info>);
static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)
// Rule Of Zero, please:
struct WHFree {
void operator()(WirehairCodec c) const { wirehair_free(c); }
};
using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace
struct UDPclient {
UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
: socket_(ex, udp::v4())
, port_(port)
{
socket_.bind({{}, port_});
}
bool run()
{
file_info fi{}; // value-initializes all members
udp::endpoint sender;
if (std::size_t n =
socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
{
// don't assume name will be zero terminated
std::string_view name(fi.name.data(),
strnlen(fi.name.data(), fi.name.max_size()));
std::cout << "Receiving " << fi.xfer_id << " length "
<< fi.file_length << " name " << std::quoted(name)
<< " from " << sender << std::endl;
decoder_.reset(
wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));
if (!decoder_) {
throw std::runtime_error("wirehair_decoder_create");
}
packet_info packet{};
for (bool data_complete = false; !data_complete;) {
if (auto len = socket_.receive_from(
asio::buffer(&packet, sizeof(packet)), sender);
n >= packet_info::HEADERLEN &&
packet.magic == packet_info::MAGIC &&
len == packet_info::HEADERLEN + packet.block_length) //
{
if (fi.xfer_id != packet.xfer_id)
continue; // TODO concurrent receives
std::cout << "(Incoming " << packet.block_length << " for "
<< fi.xfer_id << " from " << sender << ")"
<< std::endl;
// Attempt decode
switch (wirehair_decode(decoder_.get(), packet.id,
packet.block.data(),
packet.block_length)) //
{
case Wirehair_NeedMore: continue; break;
case Wirehair_Success: data_complete = true; break;
default: throw std::runtime_error("wirehair_decode");
}
std::cout << "(data complete? " << std::boolalpha
<< data_complete << ")" << std::endl;
}
}
std::cout << "Receive completed for " << fi.xfer_id << std::endl;
// try to be safe about interpreting the output name
auto spec = fs::relative(
weakly_canonical(
relative_path /
fs::path(name).lexically_normal().relative_path()),
relative_path);
if (spec.empty())
throw std::runtime_error("invalid file specification " + spec.native());
auto target = relative_path / spec;
fs::create_directories(target.parent_path());
std::cout << "Decoding to " << target << " for " << fi.xfer_id
<< std::endl;
std::vector<uint8_t> decoded(fi.file_length);
// Recover original data on decoder side
auto r = wirehair_recover(decoder_.get(), decoded.data(),
decoded.size());
if (r != Wirehair_Success)
throw std::runtime_error("wirehair_recover");
std::ofstream(target, std::ios::binary)
.write(reinterpret_cast<char const*>(decoded.data()),
decoded.size());
}
return true;
}
private:
udp::socket socket_;
uint16_t port_;
CodecPtr decoder_{};
fs::path relative_path = "assets/";
};
struct Sender {
Sender(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
: socket_(ex, udp::v4())
, port_(port)
{
}
bool send(fs::path filespec)
{
std::ifstream ifs(filespec, std::ios::binary);
std::vector<uint8_t> const contents(std::istreambuf_iterator<char>(ifs),
{});
ifs.close();
assert(contents.size() == fs::file_size(filespec));
file_info fi{}; // value-initializes all members
fi.magic = file_info::MAGIC;
fi.xfer_id = boost::uuids::random_generator{}();
fi.file_length = contents.size();
strncpy(fi.name.data(), filespec.c_str(), fi.name.size() - 1);
socket_.send_to(asio::buffer(&fi, sizeof(fi)), {{}, port_});
// Create encoder
encoder_.reset(wirehair_encoder_create(nullptr, contents.data(),
contents.size(), PACKET_SIZE));
if (!encoder_) {
throw std::runtime_error("wirehair_encoder_create");
}
auto N = contents.size() / PACKET_SIZE + 1;
N = (N * 10) / 9; // ~10% redundancy
std::cout << "Sending " << filespec << " of " << contents.size()
<< " bytes in " << N << " packets of " << PACKET_SIZE
<< std::endl;
for (unsigned block_id = 1; block_id <= N; ++block_id) {
sleep_for(500ms);
packet_info packet{};
packet.magic = packet_info::MAGIC;
packet.xfer_id = fi.xfer_id;
// Encode a packet
uint32_t writeLen = 0;
if (auto r = wirehair_encode(encoder_.get(), block_id,
packet.block.data(),
packet.block.size(), &writeLen);
r == Wirehair_Success) //
{
packet.id = block_id;
packet.block_length = writeLen;
socket_.send_to(
asio::buffer(&packet, packet_info::HEADERLEN + writeLen),
{{}, port_});
std::cout << "(Packet " << packet.block_length << " bytes)"
<< std::endl;
} else {
throw std::runtime_error("wirehair_encode");
}
}
std::cout << "Send " << filespec << " complete (xfer_id:" << fi.xfer_id
<< ")" << std::endl;
return true;
}
private:
udp::socket socket_;
uint16_t port_;
CodecPtr encoder_{};
fs::path relative_path = "assets/";
};
int main(int argc, char** argv) {
if (auto r = wirehair_init(); r != Wirehair_Success) {
std::cout << "Wirehair initialization failed: " << r << std::endl;
return 1;
}
asio::thread_pool io(1);
auto ex = io.get_executor();
post(io, [ex] {
UDPclient client{ex};
while (true)
try { client.run(); }
catch (std::exception const& e) { std::cout << "Exception: " << e.what() << "\n"; }
});
Sender sender{ex};
for (auto spec : std::vector(argv + 1, argv + argc)) {
try {
sender.send(spec);
} catch (std::exception const& e) {
std::cout << "Exception: " << e.what() << "\n";
}
}
io.join();
}
我无法在线演示,但您可以使用位于 https://github.com/sehe/wirehair-demo:
的存储库自行构建它
我正在尝试使用 UDP 从服务器向特定客户端发送数据,而不是让客户端先向服务器发送数据,根本不发送服务器数据。问题是客户端使用等待数据
receive_from()
函数,什么也得不到。由于某种原因,服务器确实设法发送数据,并且在发送了整个有效负载后,它自行关闭,但我不知道它将数据发送到哪里。
如果我运行没有客户端的服务器数据也发送成功,我不知道为什么,服务器不是应该阻止send_to()
函数直到数据发送吗?
这是客户端停止的地方(我的部分代码):
void UDPclient::run()
{
std::string relative_path = "assets/";
std::thread thread_context = std::thread([&] {_io_context.run(); }); //start the context.
file_info fileInfo;
boost::asio::io_context::work idleWork(_io_context);
boost::system::error_code error;
udp::socket socket(_io_context); //the file descriptor
WirehairCodec decoder;
udp::endpoint sender;
memset(&fileInfo, sizeof(fileInfo), 0); //reset the struct to 0 (good practice)
std::size_t bytes_transferred = socket.receive_from(
boost::asio::buffer(&fileInfo, sizeof(fileInfo)),
sender);
...
socket.close();
}
服务器(我的部分代码):
int main()
{
std::uint16_t port = 2000;
file_info fileInfo;
std::string filePath;
boost::asio::io_context io_context;
udp::socket socket(io_context, udp::endpoint(udp::v4(), port));
udp::endpoint destination(boost::asio::ip::address::from_string("127.0.0.1"), port);
boost::system::error_code ec;
const WirehairResult initResult = wirehair_init(); //initialize wirehair
if(initResult != Wirehair_Success)
{
std::cout << "failed to initialize wirehair: " << initResult << std::endl;
return -1;
}
if (ec)
std::cerr << ec.message() << std::endl;
else
{
std::cout << "Enter the specified file (Full path) to send: ";
std::cin >> filePath;
while (!boost::filesystem::exists(filePath))
{
std::cerr << "file doesn't exist." << std::endl;
std::cout << "Enter the specified file (Full path) to send: ";
std::cin >> filePath;
}
read_fileToVector(filePath, vBuffer);
file_info fileInfo;
memset(&fileInfo, 0, sizeof(fileInfo)); // Always set the struct values to 0, good practice.
//send file size, name
fileInfo.size = vBuffer.size();
strncpy(fileInfo.fileName, filePath.substr(filePath.find_last_of("/\") + 1).c_str(),
sizeof(fileInfo.fileName) - 1);
std::cout << "name: " << fileInfo.fileName << std::endl;
std::cout << "size: " << fileInfo.size << std::endl;
socket.send_to(boost::asio::buffer(&fileInfo, sizeof(fileInfo)), destination);
socket.wait(socket.wait_write);
...
socket.close();
}
没有足够的代码来说明,但我可以指出一些味道:
memset 调用错误。既然显然
file_info
必须是简单的标准布局(对于 memset 是合法的),为什么不 aggregate-initialize 它用空初始化器代替,效果相同(default-initializing 每个成员)?您的上下文显然是一个 class 成员,但您要为每个接收启动一个线程?这似乎很奇怪。我希望单线程的生命周期为 io_context/work.
如果你真的想要 reuse/restart 相同 io_context,请记住需要在其间调用
.reset()
。你也不需要open/close每个接收一个套接字
您可能希望将 UDP 套接字绑定到特定端口以在
上接收
我希望看到的内容类似于:
struct UDPclient {
UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
: socket_(ex, udp::v4())
, port_(port)
{
socket_.bind({{}, port_});
}
bool run()
{
file_info fi{}; // value-initializes all members
udp::endpoint sender;
if (std::size_t n =
socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
{
// don't assume name will be zero terminated
std::string_view name(fi.name.data(),
strnlen(fi.name.data(), fi.name.max_size()));
COUT << "Receiving " << fi.xfer_id << " length " << fi.file_length
<< " name " << std::quoted(name) << " from " << sender
<< std::endl;
decoder_.reset(
wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));
if (!decoder_) {
throw std::runtime_error("wirehair_decoder_create");
}
packet_info packet{};
for (bool data_complete = false; !data_complete;) {
if (auto len = socket_.receive_from(
asio::buffer(&packet, sizeof(packet)), sender);
n >= packet_info::HEADERLEN &&
packet.magic == packet_info::MAGIC &&
len == packet_info::HEADERLEN + packet.block_length) //
{
if (fi.xfer_id != packet.xfer_id)
continue; // TODO concurrent receives
COUT << "(Incoming " << packet.block_length << " for "
<< fi.xfer_id << " from " << sender << ")"
<< std::endl;
// Attempt decode
switch (wirehair_decode(decoder_.get(), packet.id,
packet.block.data(),
packet.block_length)) //
{
case Wirehair_NeedMore: continue; break;
case Wirehair_Success: data_complete = true; break;
default: throw std::runtime_error("wirehair_decode");
}
COUT << "(data complete? " << std::boolalpha
<< data_complete << ")" << std::endl;
}
}
COUT << "Receive completed for " << fi.xfer_id << std::endl;
// try to be safe about interpreting the output name
auto spec = fs::relative(
weakly_canonical(
relative_path /
fs::path(name).lexically_normal().relative_path()),
relative_path);
if (spec.empty())
throw std::runtime_error("invalid file specification " + spec.native());
auto target = relative_path / spec;
fs::create_directories(target.parent_path());
COUT << "Decoding to " << target << " for " << fi.xfer_id
<< std::endl;
std::vector<uint8_t> decoded(fi.file_length);
// Recover original data on decoder side
auto r = wirehair_recover(decoder_.get(), decoded.data(),
decoded.size());
if (r != Wirehair_Success)
throw std::runtime_error("wirehair_recover");
std::ofstream(target, std::ios::binary)
.write(reinterpret_cast<char const*>(decoded.data()),
decoded.size());
}
return true;
}
private:
udp::socket socket_;
uint16_t port_;
CodecPtr decoder_{};
fs::path relative_path = "assets/";
};
为此,我制定了一个协议,由两种类型的消息组成,魔术头定义为:
namespace /*protocol*/ {
#pragma pack(push, 1)
struct file_info {
boost::endian::big_uint32_t magic, file_length;
boost::uuids::uuid xfer_id;
std::array<char, PATH_MAX + 1> name;
enum : unsigned {
MAGIC = 0xDEFACED,
HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
};
};
static_assert(std::is_trivial_v<file_info>);
static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp
struct packet_info {
boost::endian::big_uint32_t magic, block_length, id;
boost::uuids::uuid xfer_id;
std::array<uint8_t, PACKET_SIZE> block;
enum : unsigned {
MAGIC = static_cast<unsigned>(~0xDEFACED),
HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
};
};
static_assert(std::is_trivial_v<packet_info>);
static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)
// Rule Of Zero, please:
struct WHFree {
void operator()(WirehairCodec c) const { wirehair_free(c); }
};
using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace
另请注意 exception-safe 句柄类型 CodecPtr
可避免泄漏这些资源的任何风险。
使用 Wirehair 的完整演示
出于兴趣,我查看了 Wirehair 编解码器并使用它实现了一个简单的同步文件传输。
- 一个限制是无法传输 zero-length 个文件(在这种情况下
wirehair_encoder_create
失败)。您必须创建一个例外来涵盖此类情况。 - 也没有正常关闭(这是因为无法取消阻塞套接字操作)
- 使接收端异步将立即解锁同时接收多个传输的可能性。我已经将
xfer_id
字段作为相关 ID 放入协议消息中。
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <wirehair/wirehair.h>
namespace asio = boost::asio;
namespace fs = std::filesystem;
using asio::ip::udp;
static constexpr int PACKET_SIZE = 1400;
static constexpr uint16_t DEFAULT_PORT = 9797;
namespace /*protocol*/ {
#pragma pack(push, 1)
struct file_info {
boost::endian::big_uint32_t magic, file_length;
boost::uuids::uuid xfer_id;
std::array<char, PATH_MAX + 1> name;
enum : unsigned {
MAGIC = 0xDEFACED,
HEADERLEN = sizeof(magic) + sizeof(file_length) + sizeof(xfer_id),
};
};
static_assert(std::is_trivial_v<file_info>);
static_assert(sizeof(file_info) + 8 <= 0xFFFF); // must fit udp
struct packet_info {
boost::endian::big_uint32_t magic, block_length, id;
boost::uuids::uuid xfer_id;
std::array<uint8_t, PACKET_SIZE> block;
enum : unsigned {
MAGIC = static_cast<unsigned>(~0xDEFACED),
HEADERLEN = sizeof(magic) + sizeof(block_length) + sizeof(id) + sizeof(xfer_id)
};
};
static_assert(std::is_trivial_v<packet_info>);
static_assert(sizeof(packet_info) + 8 <= 0xFFFF); // must fit udp
#pragma pack(pop)
// Rule Of Zero, please:
struct WHFree {
void operator()(WirehairCodec c) const { wirehair_free(c); }
};
using CodecPtr = std::unique_ptr<WirehairCodec_t, WHFree>;
} // namespace
struct UDPclient {
UDPclient(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
: socket_(ex, udp::v4())
, port_(port)
{
socket_.bind({{}, port_});
}
bool run()
{
file_info fi{}; // value-initializes all members
udp::endpoint sender;
if (std::size_t n =
socket_.receive_from(asio::buffer(&fi, sizeof(fi)), sender);
n > file_info::HEADERLEN && fi.magic == file_info::MAGIC) //
{
// don't assume name will be zero terminated
std::string_view name(fi.name.data(),
strnlen(fi.name.data(), fi.name.max_size()));
std::cout << "Receiving " << fi.xfer_id << " length "
<< fi.file_length << " name " << std::quoted(name)
<< " from " << sender << std::endl;
decoder_.reset(
wirehair_decoder_create(nullptr, fi.file_length, PACKET_SIZE));
if (!decoder_) {
throw std::runtime_error("wirehair_decoder_create");
}
packet_info packet{};
for (bool data_complete = false; !data_complete;) {
if (auto len = socket_.receive_from(
asio::buffer(&packet, sizeof(packet)), sender);
n >= packet_info::HEADERLEN &&
packet.magic == packet_info::MAGIC &&
len == packet_info::HEADERLEN + packet.block_length) //
{
if (fi.xfer_id != packet.xfer_id)
continue; // TODO concurrent receives
std::cout << "(Incoming " << packet.block_length << " for "
<< fi.xfer_id << " from " << sender << ")"
<< std::endl;
// Attempt decode
switch (wirehair_decode(decoder_.get(), packet.id,
packet.block.data(),
packet.block_length)) //
{
case Wirehair_NeedMore: continue; break;
case Wirehair_Success: data_complete = true; break;
default: throw std::runtime_error("wirehair_decode");
}
std::cout << "(data complete? " << std::boolalpha
<< data_complete << ")" << std::endl;
}
}
std::cout << "Receive completed for " << fi.xfer_id << std::endl;
// try to be safe about interpreting the output name
auto spec = fs::relative(
weakly_canonical(
relative_path /
fs::path(name).lexically_normal().relative_path()),
relative_path);
if (spec.empty())
throw std::runtime_error("invalid file specification " + spec.native());
auto target = relative_path / spec;
fs::create_directories(target.parent_path());
std::cout << "Decoding to " << target << " for " << fi.xfer_id
<< std::endl;
std::vector<uint8_t> decoded(fi.file_length);
// Recover original data on decoder side
auto r = wirehair_recover(decoder_.get(), decoded.data(),
decoded.size());
if (r != Wirehair_Success)
throw std::runtime_error("wirehair_recover");
std::ofstream(target, std::ios::binary)
.write(reinterpret_cast<char const*>(decoded.data()),
decoded.size());
}
return true;
}
private:
udp::socket socket_;
uint16_t port_;
CodecPtr decoder_{};
fs::path relative_path = "assets/";
};
struct Sender {
Sender(asio::any_io_executor ex, uint16_t port = DEFAULT_PORT)
: socket_(ex, udp::v4())
, port_(port)
{
}
bool send(fs::path filespec)
{
std::ifstream ifs(filespec, std::ios::binary);
std::vector<uint8_t> const contents(std::istreambuf_iterator<char>(ifs),
{});
ifs.close();
assert(contents.size() == fs::file_size(filespec));
file_info fi{}; // value-initializes all members
fi.magic = file_info::MAGIC;
fi.xfer_id = boost::uuids::random_generator{}();
fi.file_length = contents.size();
strncpy(fi.name.data(), filespec.c_str(), fi.name.size() - 1);
socket_.send_to(asio::buffer(&fi, sizeof(fi)), {{}, port_});
// Create encoder
encoder_.reset(wirehair_encoder_create(nullptr, contents.data(),
contents.size(), PACKET_SIZE));
if (!encoder_) {
throw std::runtime_error("wirehair_encoder_create");
}
auto N = contents.size() / PACKET_SIZE + 1;
N = (N * 10) / 9; // ~10% redundancy
std::cout << "Sending " << filespec << " of " << contents.size()
<< " bytes in " << N << " packets of " << PACKET_SIZE
<< std::endl;
for (unsigned block_id = 1; block_id <= N; ++block_id) {
sleep_for(500ms);
packet_info packet{};
packet.magic = packet_info::MAGIC;
packet.xfer_id = fi.xfer_id;
// Encode a packet
uint32_t writeLen = 0;
if (auto r = wirehair_encode(encoder_.get(), block_id,
packet.block.data(),
packet.block.size(), &writeLen);
r == Wirehair_Success) //
{
packet.id = block_id;
packet.block_length = writeLen;
socket_.send_to(
asio::buffer(&packet, packet_info::HEADERLEN + writeLen),
{{}, port_});
std::cout << "(Packet " << packet.block_length << " bytes)"
<< std::endl;
} else {
throw std::runtime_error("wirehair_encode");
}
}
std::cout << "Send " << filespec << " complete (xfer_id:" << fi.xfer_id
<< ")" << std::endl;
return true;
}
private:
udp::socket socket_;
uint16_t port_;
CodecPtr encoder_{};
fs::path relative_path = "assets/";
};
int main(int argc, char** argv) {
if (auto r = wirehair_init(); r != Wirehair_Success) {
std::cout << "Wirehair initialization failed: " << r << std::endl;
return 1;
}
asio::thread_pool io(1);
auto ex = io.get_executor();
post(io, [ex] {
UDPclient client{ex};
while (true)
try { client.run(); }
catch (std::exception const& e) { std::cout << "Exception: " << e.what() << "\n"; }
});
Sender sender{ex};
for (auto spec : std::vector(argv + 1, argv + argc)) {
try {
sender.send(spec);
} catch (std::exception const& e) {
std::cout << "Exception: " << e.what() << "\n";
}
}
io.join();
}
我无法在线演示,但您可以使用位于 https://github.com/sehe/wirehair-demo:
的存储库自行构建它