一次停止多个线程

Stopping multiple threads at once

在下面的程序中,线程等待 condition_variable_any 来确定何时停止,我错过了什么?在下面列出的程序中,线程以不可预测的方式停止;有些 调用 notify_all 之前,有些则根本不会停止。

使用的条件变量定义如下:

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

线程检查是否是时候停止如下:

std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
    quit = true;
}

主线程通知线程停止如下:

std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();

可能的输出如下:

Thread  1> Received interrupt signal at iteration 2
Thread  1> Terminate
Thread  2> Received interrupt signal at iteration 2
Thread  2> Terminate
Thread  4> Received interrupt signal at iteration 2
Thread  4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...

重现问题的完整代码下方:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            auto quit = false;
            while (!quit)
            {
                // Fake processing time during the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 200ms 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
                    if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
                    {
                        printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
                        quit = true;
                    }
                }
            }

            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

您的代码有 2 个问题,解决方法相同。

  1. 虚假 wake-up。如果你的 wait_for 因为 SW 而结束,你的条件就会满足,尽管实际上没有人向 wake-up/end 提出要求。
  2. 如果您的线程没有持有锁并且没有休眠并且您的主线程通知了所有线程怎么办?请注意,通知不会存储在任何地方,因此如果您错过了通知,以后就不会再收到了。所以那些错过的线程永远不会终止。

要解决这两个问题,您需要另一个标志,它会告诉您的线程工作已完成并且必须停止。

static bool stop = false;
//...
if (stop) // Instead of if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
//...
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
stop = true;
interrupt_cv.notify_all();

A condition_variable 用于在条件更改时 向线程发出信号 (即,例如 共享变量 更改时价值)。但是你的代码没有条件。您正试图将 condition_variable 本身用作退出信号,而这不是它的本意。 notify_all() 只会唤醒在那一刻积极等待 condition_variable 的线程。没有等待它的线程,因为它们正忙于做其他事情,将不会收到终止信号。但是这些线程一旦准备好等待就需要检测 条件 。所以 condition 需要更持久。这就是您的代码无法正常工作的原因。

在这种情况下,您只需将 quit 变量移动到全局范围,在 condition_variablemutex 旁边。设置 quit 变量将充当您的 条件 ,您可以向等待线程发送信号。然后,您可以使用 wait_for() 的重载版本,让您检查 quit 的当前状态(忽略虚假唤醒)。

试试像这样的东西:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
static bool quit = false;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            while (true)
            {
                // Fake processing time outside the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 1s 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const bool signaled = interrupt_cv.wait_for(lock, std::chrono::seconds(1), [](){ return quit; });
                    if (signaled) break;
                }
            }

            printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    printf("**** Requesting all threads to stop ****\n");
    {
        std::lock_guard<std::mutex> lock(interrupt_mutex);
        quit = true;
    }
    interrupt_cv.notify_all();

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}