c ++线程工作者在高负载下失败
c++ thread worker failure under high load
我一直在研究一个系统的想法,在这个系统中我可以有很多工作人员,这些工作人员由一个中央计时器定期触发 class。我在这里关心的部分是 TriggeredWorker
,它在循环中使用 mutex
& conditionVariable
方法等待被告知工作。它有一个方法 trigger
被调用(由不同的线程)触发要完成的工作。它是一个抽象 class 必须被子class 编辑才能实现实际的 work
方法。
我有一个测试表明这个机制有效。但是,当我通过减少触发间隔来增加负载时,测试开始失败。当我在触发之间延迟 20 微秒时,测试是 100% 可靠的。当我减少到 1 微秒时,我开始遇到失败,因为执行的工作计数从 1000(预期)减少到 986、933、999 等值。
我的问题是:(1) 到底出了什么问题,我如何捕捉到问题所在,以便我可以报告或采取措施?并且,(2) 有没有更好的方法可以使用?我不得不承认,我使用 C++ 的经验仅限于最近 3 个月,尽管我已经使用其他语言工作了几年。
非常感谢阅读...
以下是代码的关键部分:
触发的worker头文件:
#ifndef TIMER_TRIGGERED_WORKER_H
#define TIMER_TRIGGERED_WORKER_H
#include <thread>
#include <plog/Log.h>
class TriggeredWorker {
private:
std::mutex mutex_;
std::condition_variable condVar_;
std::atomic<bool> running_{false};
std::atomic<bool> ready_{false};
void workLoop();
protected:
virtual void work() {};
public:
void start();
void stop();
void trigger();
};
#endif //TIMER_TRIGGERED_WORKER_H
触发的工作器实现:
#include "TriggeredWorker.h"
void TriggeredWorker::workLoop() {
PLOGD << "workLoop started...";
while(true) {
std::unique_lock<std::mutex> lock(mutex_);
condVar_.wait(lock, [this]{
bool ready = this->ready_;
bool running = this->running_;
return ready | !running; });
this->ready_ = false;
if (!this->running_) {
break;
}
PLOGD << "Calling work()...";
work();
lock.unlock();
condVar_.notify_one();
}
PLOGD << "Worker thread completed.";
}
void TriggeredWorker::start() {
PLOGD << "Worker start...";
this->running_ = true;
auto thread = std::thread(&TriggeredWorker::workLoop, this);
thread.detach();
}
void TriggeredWorker::stop() {
PLOGD << "Worker stop.";
this->running_ = false;
}
void TriggeredWorker::trigger() {
PLOGD << "Trigger.";
std::unique_lock<std::mutex> lock(mutex_);
ready_ = true;
lock.unlock();
condVar_.notify_one();
}
和测试:
#include "catch.hpp"
#include "TriggeredWorker.h"
#include <thread>
TEST_CASE("Simple worker performs work when triggered") {
static std::atomic<int> twt_count{0};
class SimpleTriggeredWorker : public TriggeredWorker {
protected:
void work() override {
PLOGD << "Incrementing counter.";
twt_count.fetch_add(1);
}
};
SimpleTriggeredWorker worker;
worker.start();
for (int i = 0; i < 1000; i++) {
worker.trigger();
std::this_thread::sleep_for(std::chrono::microseconds(20));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(twt_count == 1000);
std::this_thread::sleep_for(std::chrono::seconds(1));
worker.stop();
}
worker.trigger()
在workLoop
获取锁之前调用了两次会发生什么?您失去了其中一个“触发器”。更小的时间间隔意味着更高的测试失败概率,因为在 workLoop
唤醒之前多次连续 worker.trigger()
调用的概率更高。请注意,没有什么可以保证 workLoop
将在 worker.trigger()
之后但在另一个 worker.trigger()
发生之前获得锁,即使这些调用一个接一个地发生(即不并行)。这是由 OS 调度程序控制的,我们无法控制它。
无论如何,核心问题是设置 ready_ = true
两次会丢失信息。不像递增一个整数两次。因此,最简单的解决方案是将 bool
替换为 int
,然后将 inc/dec 替换为 == 0
检查。此解决方案也称为信号量。更高级(可能更好,尤其是当您需要将一些数据传递给 worker 时)的方法是使用(有界?)线程安全队列。这取决于您要实现的目标。
顺便说一句 1:除了 stop()
函数(和 start()
但这并不真正相关)之外,您所有的读取和更新都在锁定状态下进行。我建议您将 stop()
也修复为处于锁定状态(因为它很少被调用)并将原子转换为 non-atomics。目前有不必要的原子开销。
顺便说一句 2:我建议不要使用 thread.detach()
。您应该将 std::thread
对象存储在 TriggeredWorker
上,并添加执行 stop
和 join
的析构函数。它们不是独立的存在,因此如果没有 detach()
,您的代码就会更安全(没有另一个就永远不会死)。
我一直在研究一个系统的想法,在这个系统中我可以有很多工作人员,这些工作人员由一个中央计时器定期触发 class。我在这里关心的部分是 TriggeredWorker
,它在循环中使用 mutex
& conditionVariable
方法等待被告知工作。它有一个方法 trigger
被调用(由不同的线程)触发要完成的工作。它是一个抽象 class 必须被子class 编辑才能实现实际的 work
方法。
我有一个测试表明这个机制有效。但是,当我通过减少触发间隔来增加负载时,测试开始失败。当我在触发之间延迟 20 微秒时,测试是 100% 可靠的。当我减少到 1 微秒时,我开始遇到失败,因为执行的工作计数从 1000(预期)减少到 986、933、999 等值。
我的问题是:(1) 到底出了什么问题,我如何捕捉到问题所在,以便我可以报告或采取措施?并且,(2) 有没有更好的方法可以使用?我不得不承认,我使用 C++ 的经验仅限于最近 3 个月,尽管我已经使用其他语言工作了几年。
非常感谢阅读...
以下是代码的关键部分:
触发的worker头文件:
#ifndef TIMER_TRIGGERED_WORKER_H
#define TIMER_TRIGGERED_WORKER_H
#include <thread>
#include <plog/Log.h>
class TriggeredWorker {
private:
std::mutex mutex_;
std::condition_variable condVar_;
std::atomic<bool> running_{false};
std::atomic<bool> ready_{false};
void workLoop();
protected:
virtual void work() {};
public:
void start();
void stop();
void trigger();
};
#endif //TIMER_TRIGGERED_WORKER_H
触发的工作器实现:
#include "TriggeredWorker.h"
void TriggeredWorker::workLoop() {
PLOGD << "workLoop started...";
while(true) {
std::unique_lock<std::mutex> lock(mutex_);
condVar_.wait(lock, [this]{
bool ready = this->ready_;
bool running = this->running_;
return ready | !running; });
this->ready_ = false;
if (!this->running_) {
break;
}
PLOGD << "Calling work()...";
work();
lock.unlock();
condVar_.notify_one();
}
PLOGD << "Worker thread completed.";
}
void TriggeredWorker::start() {
PLOGD << "Worker start...";
this->running_ = true;
auto thread = std::thread(&TriggeredWorker::workLoop, this);
thread.detach();
}
void TriggeredWorker::stop() {
PLOGD << "Worker stop.";
this->running_ = false;
}
void TriggeredWorker::trigger() {
PLOGD << "Trigger.";
std::unique_lock<std::mutex> lock(mutex_);
ready_ = true;
lock.unlock();
condVar_.notify_one();
}
和测试:
#include "catch.hpp"
#include "TriggeredWorker.h"
#include <thread>
TEST_CASE("Simple worker performs work when triggered") {
static std::atomic<int> twt_count{0};
class SimpleTriggeredWorker : public TriggeredWorker {
protected:
void work() override {
PLOGD << "Incrementing counter.";
twt_count.fetch_add(1);
}
};
SimpleTriggeredWorker worker;
worker.start();
for (int i = 0; i < 1000; i++) {
worker.trigger();
std::this_thread::sleep_for(std::chrono::microseconds(20));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(twt_count == 1000);
std::this_thread::sleep_for(std::chrono::seconds(1));
worker.stop();
}
worker.trigger()
在workLoop
获取锁之前调用了两次会发生什么?您失去了其中一个“触发器”。更小的时间间隔意味着更高的测试失败概率,因为在 workLoop
唤醒之前多次连续 worker.trigger()
调用的概率更高。请注意,没有什么可以保证 workLoop
将在 worker.trigger()
之后但在另一个 worker.trigger()
发生之前获得锁,即使这些调用一个接一个地发生(即不并行)。这是由 OS 调度程序控制的,我们无法控制它。
无论如何,核心问题是设置 ready_ = true
两次会丢失信息。不像递增一个整数两次。因此,最简单的解决方案是将 bool
替换为 int
,然后将 inc/dec 替换为 == 0
检查。此解决方案也称为信号量。更高级(可能更好,尤其是当您需要将一些数据传递给 worker 时)的方法是使用(有界?)线程安全队列。这取决于您要实现的目标。
顺便说一句 1:除了 stop()
函数(和 start()
但这并不真正相关)之外,您所有的读取和更新都在锁定状态下进行。我建议您将 stop()
也修复为处于锁定状态(因为它很少被调用)并将原子转换为 non-atomics。目前有不必要的原子开销。
顺便说一句 2:我建议不要使用 thread.detach()
。您应该将 std::thread
对象存储在 TriggeredWorker
上,并添加执行 stop
和 join
的析构函数。它们不是独立的存在,因此如果没有 detach()
,您的代码就会更安全(没有另一个就永远不会死)。