C++11 多线程可取消基于切片的工作

C++11 multithreaded cancellable slice-based work

我正在尝试创建一个基础 class 来管理基于切片的工作负载。
我的方法是创建一个基础摘要 class 来处理工作的 initialization/termination 并在特定的 class 中继承 class 仅指定实际工作和时间安排。
我还在基础 class 中添加了功能,以便在发生一定数量的错误时重新初始化工作负载。

这在一个简单的例子(下面给出)和我拥有的大多数工作负载中按预期工作,但是当我尝试将它用于特定工作负载(读取由 arduino 写入的串行端口)时,它完全混乱从 arduino 读取数据流。
我怀疑我的方法有问题,但我想不出来...

这是我的代码:

sliceWork.h

#pragma once
#include <future>
using namespace ::std;

class sliceWork
{
    int sliceIntervalMilliSeconds;
    int failureCounter;
    int maxFailsBeforeRestart;
    char* label = NULL;
    
    promise<void> workPromise;
    thread* workerThread = NULL;

    virtual void init() = 0;
    virtual bool oneSliceWork() = 0;
    void work(future<void> future);

public:
    sliceWork(int sliceInterval, int maxFails, const char* label);
    ~sliceWork();
    void initWork();
    void signalTerminate();
};

sliceWork.cpp

#include <string.h>
#include "sliceWork.h"

sliceWork::sliceWork(int interval, int maxFails, const char* workLabel)
{
    sliceIntervalMilliSeconds = interval;
    maxFailsBeforeRestart = maxFails;
    label = new char[strlen(workLabel) + 1];
    strcpy(label, workLabel);
}

sliceWork::~sliceWork()
{
    if (workerThread != NULL && workerThread->joinable())
        workerThread->join();
    printf("destructor %s\n", label);
    delete label;
    delete workerThread;
}

void sliceWork::initWork()
{
    failureCounter = 0;
    init();
    printf("Init work %s finished!\n", label);
    future<void> futureWorker = workPromise.get_future();
    workerThread = new thread(&sliceWork::work, this, move(futureWorker));
}

void sliceWork::work(future<void> future)
{
    using namespace ::std::chrono;
    steady_clock::time_point t0 = steady_clock::now();

    while (future.wait_for(chrono::milliseconds(1)) == future_status::timeout)
    {
        if (duration_cast<chrono::milliseconds>(steady_clock::now() - t0).count() 
            > sliceIntervalMilliSeconds)
        {
            if (!oneSliceWork())
            {
                if (++failureCounter > maxFailsBeforeRestart 
                    && maxFailsBeforeRestart > 0)
                {
                    init();
                    failureCounter = 0;
                }
            }
            t0 = steady_clock::now();
        }
    }
    printf("work terminated for %s!\n", label);
}

void sliceWork::signalTerminate()
{
    printf("request terminate for work %s...\n", label);
    workPromise.set_value();
}

下面是一个按预期工作的使用示例:

main.cpp

#include <string.h>
#include "sliceWork.h"

class A : public sliceWork
{
    void init() {
        printf("Init A...\n");
    }

    bool oneSliceWork() {
        printf("Working A...\n");
        return true;
    }
public:
    A(int slice, int max, const char* label) 
        : sliceWork(slice, max, label) 
    {
    }
};

class B : public sliceWork
{
    void init() {
        printf("Init B...\n");
    }

    bool oneSliceWork() {
        printf("Working B...\n");
        return true;
    }
public:
    B(int slice, int max, const char* label) 
        : sliceWork(slice, max, label) 
    {
    }
};

class C : public sliceWork
{
    void init() {
        printf("Init C...\n");
    }

    bool oneSliceWork() {
        printf("Working C...\n");
        return false;
    }
public:
    C(int slice, int max, const char* label) 
        : sliceWork(slice, max, label) 
    {
    }
};

int main()
{
     {
         A a(1000, 1000, "A");
         a.initWork();
         B b(2000, 1000, "B" );
         b.initWork();
         C c(700, 2, "C" );
         c.initWork();
         printf("Initializations finished!\n");
         ::std::this_thread::sleep_for(::std::chrono::seconds(7));
         a.signalTerminate();
         ::std::this_thread::sleep_for(::std::chrono::seconds(5));
         b.signalTerminate();
         ::std::this_thread::sleep_for(::std::chrono::seconds(4));
         c.signalTerminate();
     }
     getchar();
     return 0;
}

所以,我想问一下这种方法是否容易出错,因为我实现功能的方式。

应用程序是用 C++11 编写的,目标是 Raspberry PI 3b+ 运行 Raspberry 风格的 Debian 11 (Raspbian),如果相关的话。

从 C++11 开始,我们使用关键字 nullptr 而不是 NULL 宏。而且,std::thread 是可移动的,因此将其用作值而不是指针要好得多:

class sliceWork{
    ///...
    std::thread workerThread;
    ///...
    ~sliceWork(){
        ///...
        if (workerThread.joinable())
            workerThread.join();
        ///...
    };
    ///...
    void initWork(){
        ///...
        workerThread = thread{[this](){
            work(workPromise.get_future());
        }};
        ///...
    };
};

我使用 lambda 来初始化线程,而不是您的原始代码;它具有更好的最低性能,同时更具可读性 IMO。

如果你可以使用 C++17,那么我强烈建议使用 std::string_view 而不是旧的 null-terminated 字符串;否则只需使用 std::string。还始终建议使用构造函数成员初始化列表:

#include <string>
class sliceWork{
    ///...
    std::string_view label;
    ///...
    sliceWork(int interval, int maxFails, std::string_view workLabel):
        sliceIntervalMilliSeconds {interval},
        maxFailsBeforeRestart {maxFails},
        label {workLabel}
    {};
    ///...
};

但是,如果您可以使用 C++20,std::jthreadstd::thread 具有巨大的优势。因为现在您已经没有 delete labelworkerThread 并且 std::jthread 的析构函数自动加入,所以您可以完全删除 sliceWork 的析构函数; default compiler-provided 析构函数就可以了!!另外,你甚至可以去掉 workPromise:

class sliceWork{
    ///...
    std::string_view label;
    ///...
    std::jthread workerThread;
    ///...
    //std::promise<void> workPromise;//we don't need this
    ///...
    //~sliceWork()=default;
    ///...
    void signalTerminate(){
    ///...
        workerThread.request_stop();
    }
    ///...
    void initWork(){
        ///...
        workerThread = jthread{[this](std::stop_token stop_token){
            work(std::move(stop_token));
        }};
        ///...
    };
    ///...
    void work(std::stop_token stoken){
        ///...
        for(int ticks{0}; !stoken.stop_requested(); sleep_for(chrono::milliseconds(1)), ++ticks) {
        if (ticks > sliceIntervalMilliSeconds) {
                ///...
            }; //if
            ///...
        }; //for
        ///...
    };//work
};

最后一句话:几乎从不使用printf;它有很多警告。在 C++ 中,我们使用 std::cout。在multi_threaded应用程序中,std::cout指令以 << std::endl结束;这会刷新缓冲区并帮助输出更具可读性。