使用 concurrent_priority_queue C++ 优化计时器队列的最佳方法

Best way to optimize timer queue with concurrent_priority_queue C++

我现在正在使用 concurrent_priority_queue 处理计时器队列..

我实现了在这个队列中执行最紧急事件的基本逻辑。

这是我的代码。

TimerEvent ev{};
while (timer.mLoop)
{
    while (timer.mQueue.empty() == false)
    {
        if (timer.mQueue.try_pop(ev) == false)
            continue;

        if (ev.Type == EVENT_TYPE::PHYSICS) // Physics event is around 15 ~ 17ms
        {
            auto now = Clock::now();            
            std::this_thread::sleep_for(ev.StartTime - now);
            timer.mGameServerPtr->PostPhysicsOperation(ev.WorldID);
        }
        else if (ev.Type == EVENT_TYPE::INVINCIBLE) // This event is 3sec long.
        {
            auto now = Clock::now();
            std::this_thread::sleep_for(ev.StartTime - now); // This is wrong!!
            timer.mGameServerPtr->ReleaseInvincibleMode(ev.WorldID);
        }
    }
    std::this_thread::sleep_for(10ms);
}

如果concurrent_priority_queue.

中有类似front/top的方法,问题就迎刃而解了

但是class中没有这样的方法,因为它不是线程安全的。

因此,我只是将事件从队列中弹出并一直等到事件开始时间。 这样我就不用再往队列里插入事件了。

但问题是,如果我有另一种类型的事件,如 EVENT_TYPE::INVINCIBLE,那么我不应该只使用 sleep_for,因为这个事件将近 3 秒长。等待3秒时,PHYSICS事件没有及时执行。

我可以对 PHYSIC 事件使用 sleep_for 方法,因为它等待的时间最短。 但我必须将 INVINCIBLE 事件重新插入队列。

如何在不将事件重新插入队列的情况下优化此计时器?

How can I optimize this timer without re-insert event into queue again?

从外观上看,使用您当前使用的 concurrent_priority_queue 的实现会很困难。如果您只是使用标准 std::priority_queue 并在需要的地方添加一些锁定,那并不难。

示例:

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>

using Clock = std::chrono::steady_clock;
using time_point = std::chrono::time_point<Clock>;

struct TimerEvent {
    void operator()() { m_event(); }

    bool operator<(const TimerEvent& rhs) const {
        return rhs.StartTime < StartTime;
    }

    time_point StartTime;
    std::function<void()> m_event; // what to execute when the timer is due
};

class TimerQueue {
public:
    ~TimerQueue() { shutdown(); }
    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

    // add a new TimerEvent to the queue
    template<class... Args>
    void emplace(Args&&... args) {
        std::scoped_lock lock(m_mutex);

        m_queue.emplace(TimerEvent{std::forward<Args>(args)...});
        m_cv.notify_all();
    }

    // Wait until it's time to fire the event that is first in the queue
    // which may change while we are waiting, but that'll work too.
    bool wait_pop(TimerEvent& ev) {
        std::unique_lock lock(m_mutex);

        while(!m_shutdown &&
              (m_queue.empty() || Clock::now() < m_queue.top().StartTime))
        {
            if(m_queue.empty()) { // wait "forever"
                m_cv.wait(lock);
            } else { // wait until first StartTime
                auto st = m_queue.top().StartTime;
                m_cv.wait_until(lock, st);
            }
        }
        if(m_shutdown) return false; // time to quit

        ev = std::move(m_queue.top()); // extract event
        m_queue.pop();

        return true;
    }

private:
    std::priority_queue<TimerEvent> m_queue;
    mutable std::mutex m_mutex;
    std::condition_variable m_cv;
    std::atomic<bool> m_shutdown{};
};

如果在 wait_pop 当前等待的事件之前到期的事件到来,m_cv.wait/m_cv.wait_until 将解除阻塞(因为 m_cv.notify_all() in emplace()) 并且新元素将排在队列中的第一个。

事件循环可以简单地是:

void event_loop(TimerQueue& tq) {
    TimerEvent te;
    while(tq.wait_pop(te)) {
        te(); // execute event
    }
    // the queue was shutdown, exit thread
}

并且您可以将任何类型的可调用对象与您希望在该队列中触发的时间点放在一起。

#include <thread>

int main() {
    TimerQueue tq;

    // create a thread to run the event loop
    auto ev_th = std::thread(event_loop, std::ref(tq));

    // wait a second
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // add an event in 5 seconds
    tq.emplace(Clock::now() + std::chrono::seconds(5), [] {
        std::cout << "second\n";
    });

    // wait a second
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // add an event in 2 seconds
    tq.emplace(Clock::now() + std::chrono::seconds(2), [] {
        std::cout << "first\n";
    });

    // sleep some time
    std::this_thread::sleep_for(std::chrono::seconds(3));

    // shutdown, only the event printing "first" will have fired
    tq.shutdown();
    ev_th.join();
}

Demo with logging