c ++线程工作者在高负载下失败

c++ thread worker failure under high load

我一直在研究一个系统的想法,在这个系统中我可以有很多工作人员,这些工作人员由一个中央计时器定期触发 class。我在这里关心的部分是 TriggeredWorker,它在循环中使用 mutex & conditionVariable 方法等待被告知工作。它有一个方法 trigger 被调用(由不同的线程)触发要完成的工作。它是一个抽象 class 必须被子class 编辑才能实现实际的 work 方法。

我有一个测试表明这个机制有效。但是,当我通过减少触发间隔来增加负载时,测试开始失败。当我在触发之间延迟 20 微秒时,测试是 100% 可靠的。当我减少到 1 微秒时,我开始遇到失败,因为执行的工作计数从 1000(预期)减少到 986、933、999 等值。

我的问题是:(1) 到底出了什么问题,我如何捕捉到问题所在,以便我可以报告或采取措施?并且,(2) 有没有更好的方法可以使用?我不得不承认,我使用 C++ 的经验仅限于最近 3 个月,尽管我已经使用其他语言工作了几年。

非常感谢阅读...

以下是代码的关键部分:

触发的worker头文件:

#ifndef TIMER_TRIGGERED_WORKER_H
#define TIMER_TRIGGERED_WORKER_H

#include <thread>
#include <plog/Log.h>

class TriggeredWorker {
private:
    std::mutex mutex_;
    std::condition_variable condVar_;
    std::atomic<bool> running_{false};
    std::atomic<bool> ready_{false};

    void workLoop();
protected:
    virtual void work() {};
public:
    void start();
    void stop();
    void trigger();
};

#endif //TIMER_TRIGGERED_WORKER_H

触发的工作器实现:

#include "TriggeredWorker.h"

void TriggeredWorker::workLoop() {
    PLOGD << "workLoop started...";

    while(true) {
        std::unique_lock<std::mutex> lock(mutex_);
        condVar_.wait(lock, [this]{
            bool ready = this->ready_;
            bool running = this->running_;
            return ready | !running; });
        this->ready_ = false;

        if (!this->running_) {
            break;
        }

        PLOGD << "Calling work()...";
        work();

        lock.unlock();
        condVar_.notify_one();
    }

    PLOGD << "Worker thread completed.";
}

void TriggeredWorker::start() {
    PLOGD << "Worker start...";
    this->running_ = true;
    auto thread = std::thread(&TriggeredWorker::workLoop, this);
    thread.detach();
}

void TriggeredWorker::stop() {
    PLOGD << "Worker stop.";
    this->running_ = false;
}

void TriggeredWorker::trigger() {
    PLOGD << "Trigger.";
    std::unique_lock<std::mutex> lock(mutex_);
    ready_ = true;
    lock.unlock();
    condVar_.notify_one();
}

和测试:

#include "catch.hpp"
#include "TriggeredWorker.h"
#include <thread>

TEST_CASE("Simple worker performs work when triggered") {
    static std::atomic<int> twt_count{0};

    class SimpleTriggeredWorker : public TriggeredWorker {
    protected:
        void work() override {
            PLOGD << "Incrementing counter.";
            twt_count.fetch_add(1);
        }
    };

    SimpleTriggeredWorker worker;

    worker.start();

    for (int i = 0; i < 1000; i++) {
        worker.trigger();
        std::this_thread::sleep_for(std::chrono::microseconds(20));
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    CHECK(twt_count == 1000);

    std::this_thread::sleep_for(std::chrono::seconds(1));
    worker.stop();
}

worker.trigger()workLoop获取锁之前调用了两次会发生什么?您失去了其中一个“触发器”。更小的时间间隔意味着更高的测试失败概率,因为在 workLoop 唤醒之前多次连续 worker.trigger() 调用的概率更高。请注意,没有什么可以保证 workLoop 将在 worker.trigger() 之后但在另一个 worker.trigger() 发生之前获得锁,即使这些调用一个接一个地发生(即不并行)。这是由 OS 调度程序控制的,我们无法控制它。

无论如何,核心问题是设置 ready_ = true 两次会丢失信息。不像递增一个整数两次。因此,最简单的解决方案是将 bool 替换为 int,然后将 inc/dec 替换为 == 0 检查。此解决方案也称为信号量。更高级(可能更好,尤其是当您需要将一些数据传递给 worker 时)的方法是使用(有界?)线程安全队列。这取决于您要实现的目标。

顺便说一句 1:除了 stop() 函数(和 start() 但这并不真正相关)之外,您所有的读取和更新都在锁定状态下进行。我建议您将 stop() 也修复为处于锁定状态(因为它很少被调用)并将原子转换为 non-atomics。目前有不必要的原子开销。

顺便说一句 2:我建议不要使用 thread.detach()。您应该将 std::thread 对象存储在 TriggeredWorker 上,并添加执行 stopjoin 的析构函数。它们不是独立的存在,因此如果没有 detach(),您的代码就会更安全(没有另一个就永远不会死)。