使用 Boost::Asio 方法 async_send_to 发送多个缓冲区的最佳方法是什么
What is the best way to send many buffers with Boost::Asio method async_send_to
使用 Boost::Asio 方法 async_send_to
发送多个缓冲区的最佳方法是什么?
整个 发送过程 可以随时重复。此外,我想确定每个 发送程序 .
的(正确)经过时间
我这样试过:
//MainWindow.h
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = 0);
~MainWindow();
private slots:
void on_connectPushButton_clicked();
void on_asyncSendPushButton_clicked();
private:
Ui::MainWindow *ui;
QTime m_Timer;
int m_BufferSize;
int m_NumBuffersToSend;
int m_TransferredBuffers;
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
void handle_send(const boost::system::error_code& error, std::size_t size);
void stopTimerAndLog();
};
//MainWindow.cpp
#include "MainWindow.h"
#include "ui_MainWindow.h"
//Some Qt includes
#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
using boost::asio::ip::udp;
MainWindow::MainWindow(QWidget *parent) :
m_BufferSize(0),
m_NumBuffersToSend(0),
m_TransferredBuffers(0),
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
delete ui;
}
void MainWindow::on_connectPushButton_clicked()
{
try
{
udp::resolver resolver(m_IOService);
udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
ui->serverPortLineEdit->text().toStdString());
m_ReceiverEndpoint = *resolver.resolve(query);
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void MainWindow::stopTimerAndLog()
{
int tmm = m_Timer.elapsed();
double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
/ ( 1024.0 * 1024.0 * tmm) * 8.0;
LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}
void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
m_TransferredBuffers++;
if (error)
{
//missing error propagation to main thread
LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
}
if ( m_TransferredBuffers >= m_NumBuffersToSend )
{
stopTimerAndLog();
m_IOService.stop();
}
}
void MainWindow::on_asyncSendPushButton_clicked()
{
try
{
m_BufferSize = ui->sendBufferSpinBox->value();
char* data = new char[m_BufferSize];
memset(data, 0, m_BufferSize);
m_NumBuffersToSend = ui->numBufferSpinBox->value();
m_Timer.start();
for (int i=0; i < m_NumBuffersToSend; i++)
{
memset(data, i, m_BufferSize);
m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
m_ReceiverEndpoint,
boost::bind(&MainWindow::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
m_TransferredBuffers = 0;
m_IOService.run();
delete[] data;
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
如您所见,用户可以点击连接按钮(on_connectPushButton_clicked
)。然后 发送程序 通过单击 异步发送 按钮 (on_asyncSendPushButton_clicked
) 启动。在这里,我启动计时器并调用 m_NumBuffersToSend
次 async_send_to 方法。然后我运行了IOService
。对于每个 async_send_to
,将调用处理程序 handle_send
并且 m_TransferredBuffers
变量将递增,直到达到 m_NumBuffersToSend
。如果是这种情况,我会停止计时器和 IOService
.
但是,如果我将程序中计算的时间与使用 Wireshark 发送的 真实 的 udp 进行比较,总会有很大的差异。如何计算时间更准确?
是否可以将 m_IOService.run();
调用置于 on_asyncSendPushButton_clicked
之外?
嗯。
我不确定你在观察什么。这是
的答案
Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked
是的,您应该使用 io_service::work
来保持 IO 服务 运行ning。这是一个演示程序:
- 我创建了一个 IO 线程来为异步 operations/completion 处理程序提供服务
我去掉了 Qt 依赖; demo Run
s 随机配置:
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '[=10=]');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
void stopTimerAndLog() const;
};
作为奖励,我使用 Boost Accumulators
添加了 正确的 统计信息
我们将样本添加到累加器,而不是在 stopTimerAndLog
中进行(昂贵的)IO:
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
}
您可以 运行 重叠运行多个演示:
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
// Demo destructor joins IO thread, making sure all stats are final
mutex
保护统计信息是多余的,但 GoodPractive(TM) 因为您可能想用多个 IO 线程进行测试
输出:
avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w: 160.61 mbps, std.dev. 81.061
avg. time: 153.64 μs, std.dev. 39.0163
完整列表
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;
namespace demo_results {
using namespace boost::accumulators;
static std::mutex mx;
accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '[=14=]');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
Clock::duration elapsed;
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
#if 0
std::cout << __FUNCTION__ << " -----------------------------------------------\n";
std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size() << "\n";
std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
std::cout << __FUNCTION__ << ": " << "Time: " << tmm << " μs\n";
std::cout << __FUNCTION__ << ": " << mBitPerSecond << " MBit/s\n";
#endif
}
typedef boost::shared_ptr<Run> Ptr;
};
struct Demo {
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::io_service::work> m_work;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
std::thread m_io_thread;
Demo() :
m_IOService(),
m_work(new boost::asio::io_service::work(m_IOService)),
m_io_thread([this] { m_IOService.run(); })
{
}
~Demo() {
m_work.reset();
m_io_thread.join();
}
void on_connect(std::string const& host, std::string const& port)
{
try {
udp::resolver resolver(m_IOService);
m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void perform_run(Run::Ptr state) {
if (state->remainingToSend) {
std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
} else {
state->stopTimerAndLog();
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
state->remainingToSend -= 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
perform_run(state); // remaining buffers for run
}
void on_async_testrun() {
perform_run(boost::make_shared<Run>());
}
};
int main(int argc, char const** argv)
{
assert(argc==3);
{
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
} // Demo destructor joins IO thread, making sure all stats are final
using namespace boost::accumulators;
std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. " << sqrt(variance(demo_results::bufsize)) << "\n";
std::cout << "avg. b/w: " << mean(demo_results::mbps) << " mbps, std.dev. " << sqrt(variance(demo_results::mbps)) << "\n";
std::cout << "avg. time: " << mean(demo_results::micros) << " μs, std.dev. " << sqrt(variance(demo_results::micros)) << "\n";
}
非常感谢您的回答。这是改进我的代码的一个很好的起点。
我稍微改变了添加 async_send_to 方法的方式。
void perform_run(Run::Ptr state) {
for(decltype(state->buffersToSend) i = 0; i < state->buffersToSend; i++ )
{
std::fill(state->buffer.begin(), state->buffer.end(), i);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
if (state->transferredBuffers >= state->buffersToSend ) {
state->stopTimerAndLog();
}
}
这里是 coliru
中的完整代码
你好,
托马斯
使用 Boost::Asio 方法 async_send_to
发送多个缓冲区的最佳方法是什么?
整个 发送过程 可以随时重复。此外,我想确定每个 发送程序 .
我这样试过:
//MainWindow.h
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = 0);
~MainWindow();
private slots:
void on_connectPushButton_clicked();
void on_asyncSendPushButton_clicked();
private:
Ui::MainWindow *ui;
QTime m_Timer;
int m_BufferSize;
int m_NumBuffersToSend;
int m_TransferredBuffers;
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
void handle_send(const boost::system::error_code& error, std::size_t size);
void stopTimerAndLog();
};
//MainWindow.cpp
#include "MainWindow.h"
#include "ui_MainWindow.h"
//Some Qt includes
#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
using boost::asio::ip::udp;
MainWindow::MainWindow(QWidget *parent) :
m_BufferSize(0),
m_NumBuffersToSend(0),
m_TransferredBuffers(0),
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
delete ui;
}
void MainWindow::on_connectPushButton_clicked()
{
try
{
udp::resolver resolver(m_IOService);
udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
ui->serverPortLineEdit->text().toStdString());
m_ReceiverEndpoint = *resolver.resolve(query);
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void MainWindow::stopTimerAndLog()
{
int tmm = m_Timer.elapsed();
double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
/ ( 1024.0 * 1024.0 * tmm) * 8.0;
LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}
void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
m_TransferredBuffers++;
if (error)
{
//missing error propagation to main thread
LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
}
if ( m_TransferredBuffers >= m_NumBuffersToSend )
{
stopTimerAndLog();
m_IOService.stop();
}
}
void MainWindow::on_asyncSendPushButton_clicked()
{
try
{
m_BufferSize = ui->sendBufferSpinBox->value();
char* data = new char[m_BufferSize];
memset(data, 0, m_BufferSize);
m_NumBuffersToSend = ui->numBufferSpinBox->value();
m_Timer.start();
for (int i=0; i < m_NumBuffersToSend; i++)
{
memset(data, i, m_BufferSize);
m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
m_ReceiverEndpoint,
boost::bind(&MainWindow::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
m_TransferredBuffers = 0;
m_IOService.run();
delete[] data;
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
如您所见,用户可以点击连接按钮(on_connectPushButton_clicked
)。然后 发送程序 通过单击 异步发送 按钮 (on_asyncSendPushButton_clicked
) 启动。在这里,我启动计时器并调用 m_NumBuffersToSend
次 async_send_to 方法。然后我运行了IOService
。对于每个 async_send_to
,将调用处理程序 handle_send
并且 m_TransferredBuffers
变量将递增,直到达到 m_NumBuffersToSend
。如果是这种情况,我会停止计时器和 IOService
.
但是,如果我将程序中计算的时间与使用 Wireshark 发送的 真实 的 udp 进行比较,总会有很大的差异。如何计算时间更准确?
是否可以将 m_IOService.run();
调用置于 on_asyncSendPushButton_clicked
之外?
嗯。
我不确定你在观察什么。这是
的答案Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked
是的,您应该使用 io_service::work
来保持 IO 服务 运行ning。这是一个演示程序:
- 我创建了一个 IO 线程来为异步 operations/completion 处理程序提供服务
我去掉了 Qt 依赖; demo
Run
s 随机配置:struct Run { std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '[=10=]'); int remainingToSend = rand()%10 + 1; int transferredBuffers = 0; Clock::time_point start = Clock::now(); void stopTimerAndLog() const; };
作为奖励,我使用 Boost Accumulators
添加了 正确的 统计信息
我们将样本添加到累加器,而不是在
stopTimerAndLog
中进行(昂贵的)IO:void stopTimerAndLog() { using namespace std::chrono; Clock::duration const elapsed = Clock::now() - start; int tmm = duration_cast<microseconds>(elapsed).count(); double mBitPerSecond = tmm ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0) : std::numeric_limits<double>::infinity(); std::lock_guard<std::mutex> lk(demo_results::mx); demo_results::bufsize(buffer.size()); demo_results::micros(tmm); if (tmm) demo_results::mbps(mBitPerSecond); }
您可以 运行 重叠运行多个演示:
Demo demo; demo.on_connect(argv[1], argv[2]); for (int i = 0; i<100; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); demo.on_async_testrun(); } // Demo destructor joins IO thread, making sure all stats are final
mutex
保护统计信息是多余的,但 GoodPractive(TM) 因为您可能想用多个 IO 线程进行测试
输出:
avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w: 160.61 mbps, std.dev. 81.061
avg. time: 153.64 μs, std.dev. 39.0163
完整列表
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;
namespace demo_results {
using namespace boost::accumulators;
static std::mutex mx;
accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '[=14=]');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
Clock::duration elapsed;
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
#if 0
std::cout << __FUNCTION__ << " -----------------------------------------------\n";
std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size() << "\n";
std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
std::cout << __FUNCTION__ << ": " << "Time: " << tmm << " μs\n";
std::cout << __FUNCTION__ << ": " << mBitPerSecond << " MBit/s\n";
#endif
}
typedef boost::shared_ptr<Run> Ptr;
};
struct Demo {
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::io_service::work> m_work;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
std::thread m_io_thread;
Demo() :
m_IOService(),
m_work(new boost::asio::io_service::work(m_IOService)),
m_io_thread([this] { m_IOService.run(); })
{
}
~Demo() {
m_work.reset();
m_io_thread.join();
}
void on_connect(std::string const& host, std::string const& port)
{
try {
udp::resolver resolver(m_IOService);
m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void perform_run(Run::Ptr state) {
if (state->remainingToSend) {
std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
} else {
state->stopTimerAndLog();
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
state->remainingToSend -= 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
perform_run(state); // remaining buffers for run
}
void on_async_testrun() {
perform_run(boost::make_shared<Run>());
}
};
int main(int argc, char const** argv)
{
assert(argc==3);
{
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
} // Demo destructor joins IO thread, making sure all stats are final
using namespace boost::accumulators;
std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. " << sqrt(variance(demo_results::bufsize)) << "\n";
std::cout << "avg. b/w: " << mean(demo_results::mbps) << " mbps, std.dev. " << sqrt(variance(demo_results::mbps)) << "\n";
std::cout << "avg. time: " << mean(demo_results::micros) << " μs, std.dev. " << sqrt(variance(demo_results::micros)) << "\n";
}
非常感谢您的回答。这是改进我的代码的一个很好的起点。
我稍微改变了添加 async_send_to 方法的方式。
void perform_run(Run::Ptr state) {
for(decltype(state->buffersToSend) i = 0; i < state->buffersToSend; i++ )
{
std::fill(state->buffer.begin(), state->buffer.end(), i);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
if (state->transferredBuffers >= state->buffersToSend ) {
state->stopTimerAndLog();
}
}
这里是 coliru
中的完整代码你好,
托马斯