提升 Asio 超时方法

Boost Asio Timeout Approach

我遇到需要在 100 毫秒内从 100 多个客户端收集数据的情况。在那之后我需要处理收集到的数据。处理完成后,需要重新开始我正在循环中从客户端收集数据的步骤。

为了收集数据,我正在使用当前的实现:

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>

#include <boost/date_time/posix_time/posix_time.hpp>

#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

struct listener {

    using Buffer = std::array<char, 100>; // receiver buffer
    udp::socket s;

    listener(net::any_io_executor ex, uint16_t port) : s{ex, {{}, port}} {}

    void start() {
        read_loop(error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] { s.cancel(); });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

private:
    Buffer receive_buffer;
    udp::endpoint         sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;


    void read_loop(error_code ec, size_t bytes) {
        if (bytes != size_t(-1)) {
            // std::cout << "read_loop (" << ec.message() << ")\n";
            if (ec)
                return;

            received_packets += 1;
            unique_senders.insert(sender);
             std::cout << "Received:" << bytes << " sender:" << sender << " recorded:"
             << received_packets << "\n";
             //std::cout <<
            // std::string_view(receive_buffer.data(), bytes) << "\n";
        }
        s.async_receive_from(net::buffer(receive_buffer), sender,
                             std::bind_front(&listener::read_loop, this));
    };
};

int main() {
    net::thread_pool io(1); // single threaded

    using Timer = net::steady_timer;
    using TimePoint = std::chrono::steady_clock::time_point;
    using Clock = std::chrono::steady_clock;

    Timer timer_(io);
    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(io.get_executor(), port);

    each(&listener::start);

    TimePoint startTP = Clock::now();
    timer_.expires_at(startTP + 100ms); // collect data for 100 ms
    timer_.async_wait([&](auto &&){each(&listener::stop);});

    std::cout << "Done ! \n";
    each(&listener::report);
    

    io.join();
}

是否可以停止收集进程的方法?

TimePoint startTP = Clock::now();
timer_.expires_at(startTP + 100ms); // collect data for 100 ms
timer_.async_wait([&](auto &&){each(&listener::stop);});

我将其解释为基本上是在询问如何组合

  • 我之前给的

这也反映在您的评论中 :

I have one bottleneck. According to last code what you shared with me : [...] I tried to add the condition : record 100 ms of the data and after resume the sockets, go to process collected data. When is done, start again 100 ms to collect data from sockets and again process for 900 ms etc... The problem is that each listener now have its own current time. I am thinking how to have everything in one place, and when 100 ms is elapsed, notify all 'listeners' to resume using "stop() function provided by you".

使用我在第一个 (single-listener) 示例中使用的相同 time-slice 计算似乎更容易。

我计算时间片的方法的重点是允许与时钟同步,而无需 time-drift。它的美妙之处在于它在 multi-listeners.

上翻译 1:1

这是每个侦听器 1 个计时器但同步时间片的组合,创建方式与我从原始答案代码创建 multi-listener 示例的方式完全相同:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <list>
#include <set>

namespace net = boost::asio;
using net::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::placeholders;

using Clock = std::chrono::steady_clock;
using Timer = net::steady_timer;
constexpr auto period      = 1s;
constexpr auto armed_slice = 100ms;

struct listener {
    udp::socket s;

    listener(Clock::time_point start, net::any_io_executor ex, uint16_t port)
        : s{ex, {{}, port}}
        , start_{start} {}

    void start() {
        read_loop(event::init, error_code{}, -1); // prime the async pump
    }

    void stop() {
        post(s.get_executor(), [this] {
            stopped_ = true;
            s.cancel();
            timer.cancel();
        });
    }

    void report() const {
        std::cout << s.local_endpoint() << ": A total of " << received_packets
                  << " were received from " << unique_senders.size()
                  << " unique senders\n";
    }

  private:
    std::atomic_bool stopped_{false};
    enum class event { init, resume, receive };

    Clock::time_point const start_;
    Timer                   timer{s.get_executor()};
    std::array<char, 100>   receive_buffer;
    udp::endpoint           sender;

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;

    void read_loop(event ev, error_code ec, [[maybe_unused]] size_t bytes) {
        if (stopped_)
            return;
        auto const now    = Clock::now();
        auto const relnow = now - start_;
        switch (ev) {
        case event::receive:
            // std::cout << s.local_endpoint() << "receive (" << ec.message()
            //<< ")\n";
            if (ec)
                return;

            if ((relnow % period) > armed_slice) {
                // ignore this receive

                // wait for next slice
                auto next_slice = start_ + period * (relnow / period + 1);
                std::cout << s.local_endpoint() << " Waiting "
                          << (next_slice - now) / 1ms << "ms ("
                          << received_packets << " received)\n";
                timer.expires_at(next_slice);
                return timer.async_wait(std::bind(&listener::read_loop, this,
                                                  event::resume, _1, 0));
            } else {
                received_packets += 1;
                unique_senders.insert(sender);
                /*
                 *std::cout << s.local_endpoint() << " Received:" << bytes
                 *          << " sender:" << sender
                 *          << " recorded:" << received_packets << "\n";
                 *std::cout << std::string_view(receive_buffer.data(), bytes)
                 *          << "\n";
                 */
            }
            break;
        case event::resume:
            //std::cout << "resume (" << ec.message() << ")\n";
            if (ec)
                return;
            break;
        case event::init:
            //std::cout << s.local_endpoint() << " init " << (now - start_) / 1ms << "ms\n";
            break;
        };
        s.async_receive_from(
            net::buffer(receive_buffer), sender,
            std::bind_front(&listener::read_loop, this, event::receive));
    }
};

int main() {
    net::thread_pool io(1); // single threaded

    std::list<listener> listeners;

    auto each = [&](auto mf) { for (auto& l : listeners) (l.*mf)(); };

    auto const start = Clock::now();

    for (uint16_t port : {1234, 1235, 1236})
        listeners.emplace_back(start, io.get_executor(), port);

    each(&listener::start);

    // after 5s stop
    std::this_thread::sleep_for(5s);

    each(&listener::stop);

    io.join();
    
    each(&listener::report);
}

现场演示:

EDIT 如果输出太快无法解释:

0.0.0.0:1234 Waiting 899ms (1587 received)
0.0.0.0:1236 Waiting 899ms (1966 received)
0.0.0.0:1235 Waiting 899ms (1933 received)
0.0.0.0:1235 Waiting 899ms (4054 received)
0.0.0.0:1234 Waiting 899ms (3454 received)
0.0.0.0:1236 Waiting 899ms (4245 received)
0.0.0.0:1236 Waiting 899ms (6581 received)
0.0.0.0:1235 Waiting 899ms (6257 received)
0.0.0.0:1234 Waiting 899ms (5499 received)
0.0.0.0:1235 Waiting 899ms (8535 received)
0.0.0.0:1234 Waiting 899ms (7494 received)
0.0.0.0:1236 Waiting 899ms (8811 received)
0.0.0.0:1236 Waiting 899ms (11048 received)
0.0.0.0:1234 Waiting 899ms (9397 received)
0.0.0.0:1235 Waiting 899ms (10626 received)
0.0.0.0:1234: A total of 9402 were received from 7932 unique senders
0.0.0.0:1235: A total of 10630 were received from 8877 unique senders
0.0.0.0:1236: A total of 11053 were received from 9133 unique senders

如果您确定您仍然是单线程的,您可以考虑使用相同的实际计时器,代价是复杂性显着增加。