通过安装信号处理程序关闭多线程应用程序

Shutting down a multithreaded application by installing a signal handler

在下面的代码中,我创建了一个玩具 class,它有一个线程写入队列,而另一个线程从该队列读取并将其打印到 stdout。现在,为了彻底关闭系统,我为 SIGINT 设置了一个处理程序。我期待信号处理程序设置 std::atomic<bool> 变量 stopFlag,这将导致 threadB 将毒丸(哨兵)推到遇到 threadA 的队列中停。

class TestClass
{
public:

    TestClass();
    ~TestClass();
    void shutDown();

    TestClass(const TestClass&) = delete;
    TestClass& operator=(const TestClass&) = delete;


private:
    void init();
    void postResults();
    std::string getResult();
    void processResults();

    std::atomic<bool> stopFlag;

    std::mutex outQueueMutex;
    std::condition_variable outQueueConditionVariable;
    std::queue<std::string> outQueue;

    std::unique_ptr<std::thread> threadA;
    std::unique_ptr<std::thread> threadB;
};

void TestClass::init()
{
    threadA = std::make_unique<std::thread>(&TestClass::processResults, std::ref(*this));
    threadB = std::make_unique<std::thread>(&TestClass::postResults, std::ref(*this));
}

TestClass::TestClass():
    stopFlag(false)
{
    init();
}

TestClass::~TestClass()
{
    threadB->join();
}

void TestClass::postResults()
{
    while(true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::string name = "ABCDEF";
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            outQueue.push(name);
            outQueueConditionVariable.notify_one();
        }
        if(stopFlag)
        {
            /*For shutting down output thread*/
            auto poisonPill = std::string();
            {
                std::unique_lock<std::mutex> lock(outQueueMutex);
                outQueue.push(poisonPill);
                outQueueConditionVariable.notify_one();
            }
            threadA->join();
            break;
        }
    }
}

void TestClass::shutDown()
{
    stopFlag = true;
}

std::string TestClass::getResult()
{
    std::string result;
    {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        while(outQueue.empty())
        {
            outQueueConditionVariable.wait(lock);
        }
        result= outQueue.front();
        outQueue.pop();
    }
    return result;
}

void TestClass::processResults()
{
    while(true)
    {
        const auto result = getResult();

        if(result.empty())
        {
            break;
        }

        std::cout << result << std::endl;

    }
}

static void sigIntHandler(std::shared_ptr<TestClass> t, int)
{
    t->shutDown();
}
static std::function<void(int)> handler;

int main()
{
    auto testClass = std::make_shared<TestClass>();
    handler = std::bind(sigIntHandler, testClass, std::placeholders::_1);
    std::signal(SIGINT, [](int n){ handler(n);});
    return 0;
}

我使用 gcc 5.2 使用 -std=c++14 标志编译了这个。在我的 CentOS 7 机器上按 Ctrl-C,出现以下错误,

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
Aborted (core dumped)

请帮助我了解发生了什么。

在您的平台上,当真正的 SIGINT 信号到来时,将调用此信号处理程序。 The list of functions that can be invoked inside of this signal handler 相当有限,调用任何其他内容都会导致未定义的行为。

您的 main 函数会立即退出并销毁全局 handler 对象,然后 testClass。然后主线程在 TestClass::~TestClass 中被阻塞。信号处理程序最终会访问已经销毁的对象,这会导致未定义的行为。

根本原因是由于共享指针导致未定义的对象所有权 - 你不知道什么以及什么时候结束了你的对象。


一种更通用的方法是使用另一个线程来处理所有信号并阻止所有其他线程中的信号。然后,该信号处理线程可以在收到信号后调用任何函数。

你也根本不需要这里的智能指针和函数包装器。

示例:

class TestClass
{
public:
    TestClass();
    ~TestClass();
    void shutDown();

    TestClass(const TestClass&) = delete;
    TestClass& operator=(const TestClass&) = delete;

private:
    void postResults();
    std::string getResult();
    void processResults();


    std::mutex outQueueMutex;
    std::condition_variable outQueueConditionVariable;
    std::queue<std::string> outQueue;
    bool stop = false;

    std::thread threadA;
    std::thread threadB;
};

TestClass::TestClass()
    : threadA(std::thread(&TestClass::processResults, this))
    , threadB(std::thread(&TestClass::postResults, this))
{}

TestClass::~TestClass() {
    threadA.join();
    threadB.join();
}

void TestClass::postResults() {
    while(true) {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::string name = "ABCDEF";
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            if(stop)
                return;
            outQueue.push(name);
            outQueueConditionVariable.notify_one();
        }
    }
}

void TestClass::shutDown() {
    std::unique_lock<std::mutex> lock(outQueueMutex);
    stop = true;
    outQueueConditionVariable.notify_one();
}

std::string TestClass::getResult() {
    std::string result;
    {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        while(!stop && outQueue.empty())
            outQueueConditionVariable.wait(lock);
        if(stop)
            return result;
        result= outQueue.front();
        outQueue.pop();
    }
    return result;
}

void TestClass::processResults()
{
    while(true) {
        const auto result = getResult();
        if(result.empty())
            break;
        std::cout << result << std::endl;
    }
}

int main() {
    // Block signals in all threads.
    sigset_t sigset;
    sigfillset(&sigset);
    ::pthread_sigmask(SIG_BLOCK, &sigset, nullptr);

    TestClass testClass;

    std::thread signal_thread([&testClass]() {
        // Unblock signals in this thread only.
        sigset_t sigset;
        sigfillset(&sigset);
        int signo = ::sigwaitinfo(&sigset, nullptr);
        if(-1 == signo)
            std::abort();

        std::cout << "Received signal " << signo << '\n';
        testClass.shutDown();
    });

    signal_thread.join();
}