C++11 多线程可取消基于切片的工作
C++11 multithreaded cancellable slice-based work
我正在尝试创建一个基础 class 来管理基于切片的工作负载。
我的方法是创建一个基础摘要 class 来处理工作的 initialization/termination 并在特定的 class 中继承 class 仅指定实际工作和时间安排。
我还在基础 class 中添加了功能,以便在发生一定数量的错误时重新初始化工作负载。
这在一个简单的例子(下面给出)和我拥有的大多数工作负载中按预期工作,但是当我尝试将它用于特定工作负载(读取由 arduino 写入的串行端口)时,它完全混乱从 arduino 读取数据流。
我怀疑我的方法有问题,但我想不出来...
这是我的代码:
sliceWork.h
#pragma once
#include <future>
using namespace ::std;
class sliceWork
{
int sliceIntervalMilliSeconds;
int failureCounter;
int maxFailsBeforeRestart;
char* label = NULL;
promise<void> workPromise;
thread* workerThread = NULL;
virtual void init() = 0;
virtual bool oneSliceWork() = 0;
void work(future<void> future);
public:
sliceWork(int sliceInterval, int maxFails, const char* label);
~sliceWork();
void initWork();
void signalTerminate();
};
sliceWork.cpp
#include <string.h>
#include "sliceWork.h"
sliceWork::sliceWork(int interval, int maxFails, const char* workLabel)
{
sliceIntervalMilliSeconds = interval;
maxFailsBeforeRestart = maxFails;
label = new char[strlen(workLabel) + 1];
strcpy(label, workLabel);
}
sliceWork::~sliceWork()
{
if (workerThread != NULL && workerThread->joinable())
workerThread->join();
printf("destructor %s\n", label);
delete label;
delete workerThread;
}
void sliceWork::initWork()
{
failureCounter = 0;
init();
printf("Init work %s finished!\n", label);
future<void> futureWorker = workPromise.get_future();
workerThread = new thread(&sliceWork::work, this, move(futureWorker));
}
void sliceWork::work(future<void> future)
{
using namespace ::std::chrono;
steady_clock::time_point t0 = steady_clock::now();
while (future.wait_for(chrono::milliseconds(1)) == future_status::timeout)
{
if (duration_cast<chrono::milliseconds>(steady_clock::now() - t0).count()
> sliceIntervalMilliSeconds)
{
if (!oneSliceWork())
{
if (++failureCounter > maxFailsBeforeRestart
&& maxFailsBeforeRestart > 0)
{
init();
failureCounter = 0;
}
}
t0 = steady_clock::now();
}
}
printf("work terminated for %s!\n", label);
}
void sliceWork::signalTerminate()
{
printf("request terminate for work %s...\n", label);
workPromise.set_value();
}
下面是一个按预期工作的使用示例:
main.cpp
#include <string.h>
#include "sliceWork.h"
class A : public sliceWork
{
void init() {
printf("Init A...\n");
}
bool oneSliceWork() {
printf("Working A...\n");
return true;
}
public:
A(int slice, int max, const char* label)
: sliceWork(slice, max, label)
{
}
};
class B : public sliceWork
{
void init() {
printf("Init B...\n");
}
bool oneSliceWork() {
printf("Working B...\n");
return true;
}
public:
B(int slice, int max, const char* label)
: sliceWork(slice, max, label)
{
}
};
class C : public sliceWork
{
void init() {
printf("Init C...\n");
}
bool oneSliceWork() {
printf("Working C...\n");
return false;
}
public:
C(int slice, int max, const char* label)
: sliceWork(slice, max, label)
{
}
};
int main()
{
{
A a(1000, 1000, "A");
a.initWork();
B b(2000, 1000, "B" );
b.initWork();
C c(700, 2, "C" );
c.initWork();
printf("Initializations finished!\n");
::std::this_thread::sleep_for(::std::chrono::seconds(7));
a.signalTerminate();
::std::this_thread::sleep_for(::std::chrono::seconds(5));
b.signalTerminate();
::std::this_thread::sleep_for(::std::chrono::seconds(4));
c.signalTerminate();
}
getchar();
return 0;
}
所以,我想问一下这种方法是否容易出错,因为我实现功能的方式。
应用程序是用 C++11 编写的,目标是 Raspberry PI 3b+ 运行 Raspberry 风格的 Debian 11 (Raspbian),如果相关的话。
从 C++11 开始,我们使用关键字 nullptr
而不是 NULL
宏。而且,std::thread
是可移动的,因此将其用作值而不是指针要好得多:
class sliceWork{
///...
std::thread workerThread;
///...
~sliceWork(){
///...
if (workerThread.joinable())
workerThread.join();
///...
};
///...
void initWork(){
///...
workerThread = thread{[this](){
work(workPromise.get_future());
}};
///...
};
};
我使用 lambda 来初始化线程,而不是您的原始代码;它具有更好的最低性能,同时更具可读性 IMO。
如果你可以使用 C++17,那么我强烈建议使用 std::string_view
而不是旧的 null-terminated 字符串;否则只需使用 std::string
。还始终建议使用构造函数成员初始化列表:
#include <string>
class sliceWork{
///...
std::string_view label;
///...
sliceWork(int interval, int maxFails, std::string_view workLabel):
sliceIntervalMilliSeconds {interval},
maxFailsBeforeRestart {maxFails},
label {workLabel}
{};
///...
};
但是,如果您可以使用 C++20,std::jthread
比 std::thread
具有巨大的优势。因为现在您已经没有 delete
label
或 workerThread
并且 std::jthread
的析构函数自动加入,所以您可以完全删除 sliceWork
的析构函数; default
compiler-provided 析构函数就可以了!!另外,你甚至可以去掉 workPromise
:
class sliceWork{
///...
std::string_view label;
///...
std::jthread workerThread;
///...
//std::promise<void> workPromise;//we don't need this
///...
//~sliceWork()=default;
///...
void signalTerminate(){
///...
workerThread.request_stop();
}
///...
void initWork(){
///...
workerThread = jthread{[this](std::stop_token stop_token){
work(std::move(stop_token));
}};
///...
};
///...
void work(std::stop_token stoken){
///...
for(int ticks{0}; !stoken.stop_requested(); sleep_for(chrono::milliseconds(1)), ++ticks) {
if (ticks > sliceIntervalMilliSeconds) {
///...
}; //if
///...
}; //for
///...
};//work
};
最后一句话:几乎从不使用printf
;它有很多警告。在 C++ 中,我们使用 std::cout
。在multi_threaded应用程序中,std::cout
指令以 << std::endl
结束;这会刷新缓冲区并帮助输出更具可读性。
我正在尝试创建一个基础 class 来管理基于切片的工作负载。
我的方法是创建一个基础摘要 class 来处理工作的 initialization/termination 并在特定的 class 中继承 class 仅指定实际工作和时间安排。
我还在基础 class 中添加了功能,以便在发生一定数量的错误时重新初始化工作负载。
这在一个简单的例子(下面给出)和我拥有的大多数工作负载中按预期工作,但是当我尝试将它用于特定工作负载(读取由 arduino 写入的串行端口)时,它完全混乱从 arduino 读取数据流。
我怀疑我的方法有问题,但我想不出来...
这是我的代码:
sliceWork.h
#pragma once
#include <future>
using namespace ::std;
class sliceWork
{
int sliceIntervalMilliSeconds;
int failureCounter;
int maxFailsBeforeRestart;
char* label = NULL;
promise<void> workPromise;
thread* workerThread = NULL;
virtual void init() = 0;
virtual bool oneSliceWork() = 0;
void work(future<void> future);
public:
sliceWork(int sliceInterval, int maxFails, const char* label);
~sliceWork();
void initWork();
void signalTerminate();
};
sliceWork.cpp
#include <string.h>
#include "sliceWork.h"
sliceWork::sliceWork(int interval, int maxFails, const char* workLabel)
{
sliceIntervalMilliSeconds = interval;
maxFailsBeforeRestart = maxFails;
label = new char[strlen(workLabel) + 1];
strcpy(label, workLabel);
}
sliceWork::~sliceWork()
{
if (workerThread != NULL && workerThread->joinable())
workerThread->join();
printf("destructor %s\n", label);
delete label;
delete workerThread;
}
void sliceWork::initWork()
{
failureCounter = 0;
init();
printf("Init work %s finished!\n", label);
future<void> futureWorker = workPromise.get_future();
workerThread = new thread(&sliceWork::work, this, move(futureWorker));
}
void sliceWork::work(future<void> future)
{
using namespace ::std::chrono;
steady_clock::time_point t0 = steady_clock::now();
while (future.wait_for(chrono::milliseconds(1)) == future_status::timeout)
{
if (duration_cast<chrono::milliseconds>(steady_clock::now() - t0).count()
> sliceIntervalMilliSeconds)
{
if (!oneSliceWork())
{
if (++failureCounter > maxFailsBeforeRestart
&& maxFailsBeforeRestart > 0)
{
init();
failureCounter = 0;
}
}
t0 = steady_clock::now();
}
}
printf("work terminated for %s!\n", label);
}
void sliceWork::signalTerminate()
{
printf("request terminate for work %s...\n", label);
workPromise.set_value();
}
下面是一个按预期工作的使用示例:
main.cpp
#include <string.h>
#include "sliceWork.h"
class A : public sliceWork
{
void init() {
printf("Init A...\n");
}
bool oneSliceWork() {
printf("Working A...\n");
return true;
}
public:
A(int slice, int max, const char* label)
: sliceWork(slice, max, label)
{
}
};
class B : public sliceWork
{
void init() {
printf("Init B...\n");
}
bool oneSliceWork() {
printf("Working B...\n");
return true;
}
public:
B(int slice, int max, const char* label)
: sliceWork(slice, max, label)
{
}
};
class C : public sliceWork
{
void init() {
printf("Init C...\n");
}
bool oneSliceWork() {
printf("Working C...\n");
return false;
}
public:
C(int slice, int max, const char* label)
: sliceWork(slice, max, label)
{
}
};
int main()
{
{
A a(1000, 1000, "A");
a.initWork();
B b(2000, 1000, "B" );
b.initWork();
C c(700, 2, "C" );
c.initWork();
printf("Initializations finished!\n");
::std::this_thread::sleep_for(::std::chrono::seconds(7));
a.signalTerminate();
::std::this_thread::sleep_for(::std::chrono::seconds(5));
b.signalTerminate();
::std::this_thread::sleep_for(::std::chrono::seconds(4));
c.signalTerminate();
}
getchar();
return 0;
}
所以,我想问一下这种方法是否容易出错,因为我实现功能的方式。
应用程序是用 C++11 编写的,目标是 Raspberry PI 3b+ 运行 Raspberry 风格的 Debian 11 (Raspbian),如果相关的话。
从 C++11 开始,我们使用关键字 nullptr
而不是 NULL
宏。而且,std::thread
是可移动的,因此将其用作值而不是指针要好得多:
class sliceWork{
///...
std::thread workerThread;
///...
~sliceWork(){
///...
if (workerThread.joinable())
workerThread.join();
///...
};
///...
void initWork(){
///...
workerThread = thread{[this](){
work(workPromise.get_future());
}};
///...
};
};
我使用 lambda 来初始化线程,而不是您的原始代码;它具有更好的最低性能,同时更具可读性 IMO。
如果你可以使用 C++17,那么我强烈建议使用 std::string_view
而不是旧的 null-terminated 字符串;否则只需使用 std::string
。还始终建议使用构造函数成员初始化列表:
#include <string>
class sliceWork{
///...
std::string_view label;
///...
sliceWork(int interval, int maxFails, std::string_view workLabel):
sliceIntervalMilliSeconds {interval},
maxFailsBeforeRestart {maxFails},
label {workLabel}
{};
///...
};
但是,如果您可以使用 C++20,std::jthread
比 std::thread
具有巨大的优势。因为现在您已经没有 delete
label
或 workerThread
并且 std::jthread
的析构函数自动加入,所以您可以完全删除 sliceWork
的析构函数; default
compiler-provided 析构函数就可以了!!另外,你甚至可以去掉 workPromise
:
class sliceWork{
///...
std::string_view label;
///...
std::jthread workerThread;
///...
//std::promise<void> workPromise;//we don't need this
///...
//~sliceWork()=default;
///...
void signalTerminate(){
///...
workerThread.request_stop();
}
///...
void initWork(){
///...
workerThread = jthread{[this](std::stop_token stop_token){
work(std::move(stop_token));
}};
///...
};
///...
void work(std::stop_token stoken){
///...
for(int ticks{0}; !stoken.stop_requested(); sleep_for(chrono::milliseconds(1)), ++ticks) {
if (ticks > sliceIntervalMilliSeconds) {
///...
}; //if
///...
}; //for
///...
};//work
};
最后一句话:几乎从不使用printf
;它有很多警告。在 C++ 中,我们使用 std::cout
。在multi_threaded应用程序中,std::cout
指令以 << std::endl
结束;这会刷新缓冲区并帮助输出更具可读性。