C++ 条件变量,用于发出分离线程执行停顿结束的信号

C++ Condition variable to signal end of detached thread execution stalls

我有一些代码,我正在处理其中生成分离线程的地方,做了一些工作,然后应该等待来自 main() 的信号,然后再将另一个信号发送回 main 指示线程已经退出了。

我对条件变量还很陌生,但是我以前使用过一些多线程代码。 (主要是互斥量。)

这是我尝试实现的,但它的行为与我预期的不同。 (我可能误解了什么。)

这背后的想法是将包含两个标志的结构传递给每个分离的线程。第一个标志表示 main() 表示“可以退出,并退出线程函数的末尾”。第二个标志由线程本身设置,并向 main() 发出线程确实已退出的信号。 (这只是为了确认来自 main() 的信号接收正常并发回一些东西。)

#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>

struct ThreadStruct
{

    int id;

    std::condition_variable cv;
    std::mutex m;

    int ok_to_exit;
    int exit_confirm;

};



void Pause()
{
    std::cout << "Press enter to continue" << std::endl;
    std::cin.get();
}


void detachedThread(ThreadStruct* threadData)
{
    std::cout << "START: Detached Thread " << threadData->id << std::endl;
    
    // Performs some arbitrary amount of work.
    for(int i = 0; i < 100000; ++ i);

    std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
    
    std::unique_lock<std::mutex> lock(threadData->m);
    std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
    threadData->cv.wait(lock, [threadData]{return threadData->ok_to_exit == 1;});
    std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
    threadData->exit_confirm = 1;

}

int main(int argc, char** argv)
{
    
    int totalThreadCount = 1;

    ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
    std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;

    for(int i = totalThreadCount - 1; i >= 0; --i)
    {
        perThreadData[i].id = i;
        perThreadData[i].ok_to_exit = 0;
        perThreadData[i].exit_confirm = 0;
                
        std::thread t(detachedThread, &perThreadData[i]);
        t.detach();
            
    }


    for(int i{0}; i < totalThreadCount; ++i)
    {
        ThreadStruct *threadData = &perThreadData[i];
        
        std::cout << "Waiting for lock - main() thread" << std::endl;
        std::unique_lock<std::mutex> lock(perThreadData[i].m);
        std::cout << "Lock obtained - main() thread" << std::endl;
        perThreadData[i].cv.wait(lock);

        threadData->ok_to_exit = 1;

        // added after comment from Sergey
        threadData->cv.notify_all(); 

        std::cout << "Done - main() thread" << std::endl;
        
    }

    for(int i{0}; i < totalThreadCount; ++i)
    {
        std::size_t thread_index = i;
        ThreadStruct& threadData = perThreadData[thread_index];
        
        std::unique_lock<std::mutex> lock(threadData.m);
        std::cout << "i=" << i << std::endl;
        int &exit_confirm = threadData.exit_confirm;
        threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
        std::cout << "i=" << i << " finished!" << std::endl;
    }

    Pause();

    return 0;
}

这运行到行:

WAIT: Detached thread 0

但是分离的线程永远不会退出。我做错了什么?

编辑:进一步实验 - 这有帮助吗?

我认为通过删除一个步骤来简化事情可能会有所帮助。在下面的示例中,main() 不会向分离线程发送信号 ,它只是等待来自 分离线程的信号。

但同样,此代码在打印 DROP 后挂起...这意味着分离的线程正常退出,但 main() 不知道。

#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>

struct ThreadStruct
{

    int id;

    std::condition_variable cv;
    std::mutex m;

    int ok_to_exit;
    int exit_confirm;

};



void Pause()
{
    std::cout << "Press enter to continue" << std::endl;
    std::cin.get();
}


void detachedThread(ThreadStruct* threadData)
{
    std::cout << "START: Detached Thread " << threadData->id << std::endl;
    
    // Performs some arbitrary amount of work.
    for(int i = 0; i < 100000; ++ i);

    std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
    
    std::unique_lock<std::mutex> lock(threadData->m);
    
    std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
    threadData->exit_confirm = 1;

    threadData->cv.notify_all();
    std::cout << "DROP" << std::endl;

}

int main(int argc, char** argv)
{
    
    int totalThreadCount = 1;

    ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
    std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;

    for(int i = totalThreadCount - 1; i >= 0; --i)
    {
        perThreadData[i].id = i;
        perThreadData[i].ok_to_exit = 0;
        perThreadData[i].exit_confirm = 0;
                
        std::thread t(detachedThread, &perThreadData[i]);
        t.detach();
            
    }

    for(int i{0}; i < totalThreadCount; ++i)
    {
        std::size_t thread_index = i;
        ThreadStruct& threadData = perThreadData[thread_index];
        
        std::cout << "Waiting for mutex" << std::endl;
        std::unique_lock<std::mutex> lock(threadData.m);
        std::cout << "i=" << i << std::endl;
        int &exit_confirm = threadData.exit_confirm;
        threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
        std::cout << "i=" << i << " finished!" << std::endl;
    }

    Pause();

    return 0;
}

您的 lambda 正在捕获 by-value,因此它永远不会看到对 exit_confim.

所做的更改

捕获 by-reference 改为:

int& exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [&exit_confirm] { return exit_confirm == 1; });
//                        ^
//                        | capture by-reference

您还需要delete[]new[]所做的

delete[] ThreadStruct;

当您完成 structs 时。


我还注意到在 free 之后有一些堆使用,但是当我对代码进行一些简化时,它神奇地消失了。我没有进一步调查。

一些建议:

  • 将处理 ThreadStruct 成员变量和锁的代码移至 ThreadStruct class。它通常使阅读和维护更简单。
  • 删除未使用的变量和headers。
  • 不要使用 new[]/delete[]。对于此示例,您可以改用 std::vector<ThreadStruct>
  • 根本不要 detach() - 我在下面没有做任何事情,但我建议使用 join() (在附加线程上)进行最后的同步。这就是它的用途。
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

struct ThreadStruct {
    int id;

    // move this function into the ThreadStruct class
    void detachedThread() {
        std::cout << "START: Detached Thread " << id << std::endl;
        // Performs some arbitrary amount of work (optimized away here)
        std::cout << "FINISH: Detached thread " << id << std::endl;

        std::lock_guard<std::mutex> lock(m);

        std::cout << "EXIT: Detached thread " << id << std::endl;
        exit_confirm = 1;

        cv.notify_all();
        std::cout << "DROP" << std::endl;
    }

    // add support functions instead of doing these things in your normal code
    void wait_for_exit_confirm() {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [this] { return exit_confirm == 1; });
    }

    void spawn_detached() {
        std::thread(&ThreadStruct::detachedThread, this).detach();
    }

private:
    std::condition_variable cv;
    std::mutex m;

    int exit_confirm = 0;           // initialize
};

有了上面的内容,main变得更干净了:

int main() {
    int totalThreadCount = 1;

    std::vector<ThreadStruct> perThreadData(totalThreadCount);

    std::cout << "Main thread starting " << perThreadData.size() << " thread(s)\n";

    int i = 0;
    for(auto& threadData : perThreadData) {
        threadData.id = i++;
        threadData.spawn_detached();
    }

    for(auto& threadData : perThreadData) {
        std::cout << "Waiting for mutex" << std::endl;
        std::cout << "i=" << threadData.id << std::endl;
        threadData.wait_for_exit_confirm();
        std::cout << "i=" << threadData.id << " finished!" << std::endl;
    }

    std::cout << "Press enter to continue" << std::endl;
    std::cin.get();
}

为了将来的兴趣:修复了问题中发布的原始 MWE。有两个问题

  • 没有通过引用在 lambda 中捕获局部变量(参见其他答案)

  • 1 次 wait() 调用过多

      #include <cstdlib> // std::atoi
      #include <iostream>
      #include <thread>
      #include <vector>
      #include <random>
      #include <future>
      #include <condition_variable>
      #include <mutex>
    
      struct ThreadStruct
      {
    
          int id;
    
          std::condition_variable cv;
          std::mutex m;
    
          int ok_to_exit;
          int exit_confirm;
    
      };
    
    
    
      void Pause()
      {
          std::cout << "Press enter to continue" << std::endl;
          std::cin.get();
      }
    
    
      void detachedThread(ThreadStruct* threadData)
      {
          std::cout << "START: Detached Thread " << threadData->id << std::endl;
    
          // Performs some arbitrary amount of work.
          for (int i = 0; i < 100000; ++i);
    
          std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
    
          std::unique_lock<std::mutex> lock(threadData->m);
          std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
          threadData->cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;});
          std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
          threadData->exit_confirm = 1;
    
          threadData->cv.notify_all();
          std::cout << "DROP" << std::endl;
    
      }
    
      int main(int argc, char** argv)
      {
    
          int totalThreadCount = 1;
    
          ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
          std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
    
          for (int i = totalThreadCount - 1; i >= 0; --i)
          {
              perThreadData[i].id = i;
              perThreadData[i].ok_to_exit = 0;
              perThreadData[i].exit_confirm = 0;
    
              std::thread t(detachedThread, &perThreadData[i]);
              t.detach();
    
          }
    
          for(int i{0}; i < totalThreadCount; ++ i)
          {
              ThreadStruct *threadData = &perThreadData[i];
    
              std::cout << "Waiting for lock - main() thread" << std::endl;
              std::unique_lock<std::mutex> lock(perThreadData[i].m);
              std::cout << "Lock obtained - main() thread" << std::endl;
              //perThreadData[i].cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;});
    
              std::cout << "Wait complete" << std::endl;
              threadData->ok_to_exit = 1;
              threadData->cv.notify_all();
              std::cout << "Done - main() thread" << std::endl;
    
          }
    
          for (int i{ 0 }; i < totalThreadCount; ++i)
          {
              std::size_t thread_index = i;
              ThreadStruct& threadData = perThreadData[thread_index];
    
              std::cout << "Waiting for mutex" << std::endl;
              std::unique_lock<std::mutex> lock(threadData.m);
              std::cout << "i=" << i << std::endl;
              int& exit_confirm = threadData.exit_confirm;
              threadData.cv.wait(lock, [&exit_confirm] {return exit_confirm == 1; });
              std::cout << "i=" << i << " finished!" << std::endl;
          }
    
          Pause();
    
          return 0;
      }