使用 Boost::Asio 方法 async_send_to 发送多个缓冲区的最佳方法是什么

What is the best way to send many buffers with Boost::Asio method async_send_to

使用 Boost::Asio 方法 async_send_to 发送多个缓冲区的最佳方法是什么? 整个 发送过程 可以随时重复。此外,我想确定每个 发送程序 .

的(正确)经过时间

我这样试过:

//MainWindow.h

class MainWindow : public QMainWindow
{
    Q_OBJECT
public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();
private slots:
    void on_connectPushButton_clicked();
    void on_asyncSendPushButton_clicked();
private:
    Ui::MainWindow *ui;
    QTime m_Timer;
    int m_BufferSize;
    int m_NumBuffersToSend;
    int m_TransferredBuffers;
    boost::asio::io_service m_IOService;
    std::unique_ptr<boost::asio::ip::udp::socket>     m_pSocket;
    boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
    void handle_send(const boost::system::error_code& error, std::size_t size);
    void stopTimerAndLog();
};

//MainWindow.cpp

#include "MainWindow.h"
#include "ui_MainWindow.h"

//Some Qt includes

#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>

using boost::asio::ip::udp;

MainWindow::MainWindow(QWidget *parent) :
    m_BufferSize(0),
    m_NumBuffersToSend(0),
    m_TransferredBuffers(0),
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);
}

MainWindow::~MainWindow()
{
    delete ui;
}


void MainWindow::on_connectPushButton_clicked()
{
    try
    {
        udp::resolver resolver(m_IOService);
        udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
                ui->serverPortLineEdit->text().toStdString());
        m_ReceiverEndpoint = *resolver.resolve(query);
        m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
        m_pSocket->open(udp::v4());
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

void MainWindow::stopTimerAndLog()
{
    int tmm = m_Timer.elapsed();
    double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
            / ( 1024.0 * 1024.0 * tmm) * 8.0;
    LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
    ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}

void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
    m_TransferredBuffers++;

    if (error)
    {
        //missing error propagation to main thread
        LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
        LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
    }

    if ( m_TransferredBuffers >= m_NumBuffersToSend )
    {
        stopTimerAndLog();
        m_IOService.stop();
    }
}

void MainWindow::on_asyncSendPushButton_clicked()
{
    try
    {
        m_BufferSize = ui->sendBufferSpinBox->value();
        char* data = new char[m_BufferSize];
        memset(data, 0, m_BufferSize);
        m_NumBuffersToSend = ui->numBufferSpinBox->value();

        m_Timer.start();

        for (int i=0; i < m_NumBuffersToSend; i++)
        {
            memset(data, i, m_BufferSize);

            m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
                    m_ReceiverEndpoint,
                    boost::bind(&MainWindow::handle_send, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
        }
        m_TransferredBuffers = 0;
        m_IOService.run();
        delete[] data;
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

如您所见,用户可以点击连接按钮(on_connectPushButton_clicked)。然后 发送程序 通过单击 异步发送 按钮 (on_asyncSendPushButton_clicked) 启动。在这里,我启动计时器并调用 m_NumBuffersToSend 次 async_send_to 方法。然后我运行了IOService。对于每个 async_send_to,将调用处理程序 handle_send 并且 m_TransferredBuffers 变量将递增,直到达到 m_NumBuffersToSend。如果是这种情况,我会停止计时器和 IOService.

但是,如果我将程序中计算的时间与使用 Wireshark 发送的 真实 的 udp 进行比较,总会有很大的差异。如何计算时间更准确?

是否可以将 m_IOService.run(); 调用置于 on_asyncSendPushButton_clicked 之外?

嗯。

我不确定你在观察什么。这是

的答案

Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked

是的,您应该使用 io_service::work 来保持 IO 服务 运行ning。这是一个演示程序:

Live On Coliru

  • 我创建了一个 IO 线程来为异步 operations/completion 处理程序提供服务
  • 我去掉了 Qt 依赖; demo Runs 随机配置:

    struct Run {
        std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '[=10=]');
        int remainingToSend      = rand()%10 + 1;
        int transferredBuffers   = 0;
        Clock::time_point start  = Clock::now();
    
        void stopTimerAndLog() const;
    };
    
  • 作为奖励,我使用 Boost Accumulators

  • 添加了 正确的 统计信息
  • 我们将样本添加到累加器,而不是在 stopTimerAndLog 中进行(昂贵的)IO:

    void stopTimerAndLog()
    {
        using namespace std::chrono;
    
        Clock::duration const elapsed = Clock::now() - start;
        int tmm = duration_cast<microseconds>(elapsed).count();
    
        double mBitPerSecond = tmm
            ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
            : std::numeric_limits<double>::infinity();
    
        std::lock_guard<std::mutex> lk(demo_results::mx);
    
        demo_results::bufsize(buffer.size());
        demo_results::micros(tmm);
        if (tmm)
            demo_results::mbps(mBitPerSecond);
    }
    
  • 您可以 运行 重叠运行多个演示:

    Demo demo;
    demo.on_connect(argv[1], argv[2]);
    
    for (int i = 0; i<100; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        demo.on_async_testrun();
    }
    // Demo destructor joins IO thread, making sure all stats are final
    
  • mutex 保护统计信息是多余的,但 GoodPractive(TM) 因为您可能想用多个 IO 线程进行测试

输出:

avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w:         160.61 mbps, std.dev. 81.061
avg. time:        153.64 μs, std.dev. 39.0163

完整列表

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>

using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;

namespace demo_results {
    using namespace boost::accumulators;

    static std::mutex mx;
    accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}

struct Run {
    std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '[=14=]');
    int remainingToSend      = rand()%10 + 1;
    int transferredBuffers   = 0;
    Clock::time_point start  = Clock::now();
    Clock::duration elapsed;

    void stopTimerAndLog()
    {
        using namespace std::chrono;

        Clock::duration const elapsed = Clock::now() - start;
        int tmm = duration_cast<microseconds>(elapsed).count();

        double mBitPerSecond = tmm
            ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
            : std::numeric_limits<double>::infinity();

        std::lock_guard<std::mutex> lk(demo_results::mx);

        demo_results::bufsize(buffer.size());
        demo_results::micros(tmm);
        if (tmm)
            demo_results::mbps(mBitPerSecond);

#if 0
        std::cout << __FUNCTION__ << "  -----------------------------------------------\n";
        std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size()      << "\n";
        std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
        std::cout << __FUNCTION__ << ": " << "Time: "        << tmm                << " μs\n";
        std::cout << __FUNCTION__ << ": " << mBitPerSecond   << " MBit/s\n";
#endif
    }

    typedef boost::shared_ptr<Run> Ptr;
};

struct Demo {
    boost::asio::io_service                        m_IOService;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    std::unique_ptr<boost::asio::ip::udp::socket>  m_pSocket;
    boost::asio::ip::udp::endpoint                 m_ReceiverEndpoint;
    std::thread                                    m_io_thread;

    Demo() :
        m_IOService(),
        m_work(new boost::asio::io_service::work(m_IOService)),
        m_io_thread([this] { m_IOService.run(); })
    {
    }

    ~Demo() {
        m_work.reset();
        m_io_thread.join();
    }

    void on_connect(std::string const& host, std::string const& port)
    {
        try {
            udp::resolver resolver(m_IOService);
            m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
            m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
            m_pSocket->open(udp::v4());
        }
        catch (std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
    }

    void perform_run(Run::Ptr state) {
        if (state->remainingToSend) {
            std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);

            m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
                    m_ReceiverEndpoint,
                    boost::bind(&Demo::handle_sent, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred,
                        state));
        } else {
            state->stopTimerAndLog();
        }
    }

    void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
    {
        assert(actually_transferred == state->buffer.size());
        state->transferredBuffers += 1;
        state->remainingToSend    -= 1;

        if (error) {
            // missing error propagation to main thread
            std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
            std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
        }

        perform_run(state); // remaining buffers for run
    }

    void on_async_testrun() {
        perform_run(boost::make_shared<Run>());
    }
};

int main(int argc, char const** argv)
{
    assert(argc==3);

    {
        Demo demo;
        demo.on_connect(argv[1], argv[2]);

        for (int i = 0; i<100; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            demo.on_async_testrun();
        }
    } // Demo destructor joins IO thread, making sure all stats are final

    using namespace boost::accumulators;
    std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. "      << sqrt(variance(demo_results::bufsize)) << "\n";
    std::cout << "avg. b/w:         " << mean(demo_results::mbps)    << " mbps, std.dev. " << sqrt(variance(demo_results::mbps))    << "\n";
    std::cout << "avg. time:        " << mean(demo_results::micros)  << " μs, std.dev. "   << sqrt(variance(demo_results::micros))  << "\n";
}

非常感谢您的回答。这是改进我的代码的一个很好的起点。

我稍微改变了添加 async_send_to 方法的方式。

void perform_run(Run::Ptr state) {

    for(decltype(state->buffersToSend) i = 0; i < state->buffersToSend; i++ )
    {
        std::fill(state->buffer.begin(), state->buffer.end(), i);

        m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
                m_ReceiverEndpoint,
                boost::bind(&Demo::handle_sent, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred,
                    state));
    }
}

void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
    assert(actually_transferred == state->buffer.size());
    state->transferredBuffers += 1;

    if (error) {
        // missing error propagation to main thread
        std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
        std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
    }

    if (state->transferredBuffers >= state->buffersToSend ) {
                    state->stopTimerAndLog();
    }
}

这里是 coliru

中的完整代码

你好,

托马斯