实现用于收集数据流和处理的 C++ 多线程的标准方法

Sandard way of implementing c++ multi-threading for collecting data streams and processing

我是 C++ 开发的新手。我正在尝试 运行 个相互独立的无限函数。 问题陈述与此类似:

我尝试实现的方式是

#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <mutex>

int g_i = 0;
std::mutex g_i_mutex; // protects g_i

// increment g_i by 1
void increment_itr()
{
  const std::lock_guard<std::mutex> lock(g_i_mutex);
  g_i += 1;
}

void *fun(void *s)
{
  std::string str;
  str = (char *)s;
  std::cout << str << " start\n";
  while (1)
  {
    std::cout << str << " " << g_i << "\n";
    if(g_i > 1000) break;
    increment_itr();
  }
  pthread_exit(NULL);
  std::cout << str << " end\n";
}

void *checker(void *s) {
  while (1) {
    if(g_i > 1000) {
      std::cout<<"**********************\n";
      std::cout << "checker: g_i == 100\n";
      std::cout<<"**********************\n";
      pthread_exit(NULL);
    }
  }
}


int main()
{
  int itr = 0;
  pthread_t threads[3];
  pthread_attr_t attr;
  void *status;

  // Initialize and set thread joinable
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  int rc1 = pthread_create(&threads[0], &attr, fun, (void *)&"foo");
  int rc2 = pthread_create(&threads[1], &attr, fun, (void *)&"bar");
  int rc3 = pthread_create(&threads[2], &attr, checker, (void *)&"checker");
  

  if (rc1 || rc2 || rc3)
  {
    std::cout << "Error:unable to create thread," << rc1 << rc2 << rc3 << std::endl;
    exit(-1);
  }

  pthread_attr_destroy(&attr);

  std::cout << "main func continues\n";

  for (int i = 0; i < 3; i++)
  {
    rc1 = pthread_join(threads[i], &status);
    if (rc1)
    {
      std::cout << "Error:unable to join," << rc1 << std::endl;
      exit(-1);
    }
    std::cout << "Main: completed thread id :" << i;
    std::cout << "  exiting with status :" << status << std::endl;
  }

  std::cout << "main end\n";

  return 0;
}

这行得通,但我想知道此实现是否是执行此操作的标准方法,或者可以用任何更好的方法来完成此操作?

您正确地在 increment_itr 中获取了锁,但是您的 fun 函数在未获取锁的情况下访问 g_i

改变这个:

void increment_itr()
{
  const std::lock_guard<std::mutex> lock(g_i_mutex);
  g_i += 1;
}

对此

int increment_itr()
{
  std::lock_guard<std::mutex> lock(g_i_mutex);  // the const wasn't actually needed
  g_i = g_i + 1;
  return g_i;  // return the updated value of g_i
}

这不是线程安全的:

if(g_i > 1000) break;  // access g_i without acquiring the lock
increment_itr();

这个更好:

if (increment_itr() > 1000) {
    break;
}

checker 中需要类似的修复:


void *checker(void *s) {
  while (1) {
    int i;
    {
      std::lock_guard<std::mutex> lock(g_i_mutex);
      i = g_i;
    }
    if(i > 1000) {
      std::cout<<"**********************\n";
      std::cout << "checker: g_i == 100\n";
      std::cout<<"**********************\n";
      break;
    }
    return NULL;
  }

关于你的设计问题。这是根本问题。

您正在提议一个专用线程,该线程不断获取锁并对数据结构进行某种排序检查。如果满足某个条件,它会做一些额外的处理,比如写入数据库。如果数据结构(两个映射)中的任何内容都没有改变,那么在无限循环中旋转的线程将是一种浪费。相反,您只希望在某些情况发生变化时对 运行 进行完整性检查。您可以使用 condition variable 让检查器线程暂停,直到发生某些实际变化。

这是一个更好的设计。


uint64_t g_data_version = 0;
std::conditional_variable g_cv;

void *fun(void *s)
{
    while (true) {

        << wait for data from the source >> 

        {
            std::lock_guard<std::mutex> lock(g_i_mutex);
            // update the data in the map while under a lock
            // e.g. g_n++;
            //

            // increment the data version to signal a new revision has been made
            g_data_version += 1;
        }

        // notify the checker thread that something has changed
        g_cv.notify_all();
    }

}

然后你的检查器函数只有在它 fun 发信号告诉它发生变化时才会唤醒。

void *checker(void *s) {
  while (1) {

      // lock the mutex
      std::unique_lock<std::mutex> lock(g_i_mutex);

      // do the data comparison check here

      // now wait for the data version to change
      uint64_t version = g_data_version;
      while (version != g_data_version) { // check for spurious wake up
         cv.wait(lock); // this atomically unlocks the mutex and waits for a notify() call on another thread to happen
      }
  }
}