实现用于收集数据流和处理的 C++ 多线程的标准方法
Sandard way of implementing c++ multi-threading for collecting data streams and processing
我是 C++ 开发的新手。我正在尝试 运行 个相互独立的无限函数。
问题陈述与此类似:
我尝试实现的方式是
#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <mutex>
int g_i = 0;
std::mutex g_i_mutex; // protects g_i
// increment g_i by 1
void increment_itr()
{
const std::lock_guard<std::mutex> lock(g_i_mutex);
g_i += 1;
}
void *fun(void *s)
{
std::string str;
str = (char *)s;
std::cout << str << " start\n";
while (1)
{
std::cout << str << " " << g_i << "\n";
if(g_i > 1000) break;
increment_itr();
}
pthread_exit(NULL);
std::cout << str << " end\n";
}
void *checker(void *s) {
while (1) {
if(g_i > 1000) {
std::cout<<"**********************\n";
std::cout << "checker: g_i == 100\n";
std::cout<<"**********************\n";
pthread_exit(NULL);
}
}
}
int main()
{
int itr = 0;
pthread_t threads[3];
pthread_attr_t attr;
void *status;
// Initialize and set thread joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int rc1 = pthread_create(&threads[0], &attr, fun, (void *)&"foo");
int rc2 = pthread_create(&threads[1], &attr, fun, (void *)&"bar");
int rc3 = pthread_create(&threads[2], &attr, checker, (void *)&"checker");
if (rc1 || rc2 || rc3)
{
std::cout << "Error:unable to create thread," << rc1 << rc2 << rc3 << std::endl;
exit(-1);
}
pthread_attr_destroy(&attr);
std::cout << "main func continues\n";
for (int i = 0; i < 3; i++)
{
rc1 = pthread_join(threads[i], &status);
if (rc1)
{
std::cout << "Error:unable to join," << rc1 << std::endl;
exit(-1);
}
std::cout << "Main: completed thread id :" << i;
std::cout << " exiting with status :" << status << std::endl;
}
std::cout << "main end\n";
return 0;
}
这行得通,但我想知道此实现是否是执行此操作的标准方法,或者可以用任何更好的方法来完成此操作?
您正确地在 increment_itr
中获取了锁,但是您的 fun
函数在未获取锁的情况下访问 g_i
。
改变这个:
void increment_itr()
{
const std::lock_guard<std::mutex> lock(g_i_mutex);
g_i += 1;
}
对此
int increment_itr()
{
std::lock_guard<std::mutex> lock(g_i_mutex); // the const wasn't actually needed
g_i = g_i + 1;
return g_i; // return the updated value of g_i
}
这不是线程安全的:
if(g_i > 1000) break; // access g_i without acquiring the lock
increment_itr();
这个更好:
if (increment_itr() > 1000) {
break;
}
checker
中需要类似的修复:
void *checker(void *s) {
while (1) {
int i;
{
std::lock_guard<std::mutex> lock(g_i_mutex);
i = g_i;
}
if(i > 1000) {
std::cout<<"**********************\n";
std::cout << "checker: g_i == 100\n";
std::cout<<"**********************\n";
break;
}
return NULL;
}
关于你的设计问题。这是根本问题。
您正在提议一个专用线程,该线程不断获取锁并对数据结构进行某种排序检查。如果满足某个条件,它会做一些额外的处理,比如写入数据库。如果数据结构(两个映射)中的任何内容都没有改变,那么在无限循环中旋转的线程将是一种浪费。相反,您只希望在某些情况发生变化时对 运行 进行完整性检查。您可以使用 condition variable 让检查器线程暂停,直到发生某些实际变化。
这是一个更好的设计。
uint64_t g_data_version = 0;
std::conditional_variable g_cv;
void *fun(void *s)
{
while (true) {
<< wait for data from the source >>
{
std::lock_guard<std::mutex> lock(g_i_mutex);
// update the data in the map while under a lock
// e.g. g_n++;
//
// increment the data version to signal a new revision has been made
g_data_version += 1;
}
// notify the checker thread that something has changed
g_cv.notify_all();
}
}
然后你的检查器函数只有在它 fun
发信号告诉它发生变化时才会唤醒。
void *checker(void *s) {
while (1) {
// lock the mutex
std::unique_lock<std::mutex> lock(g_i_mutex);
// do the data comparison check here
// now wait for the data version to change
uint64_t version = g_data_version;
while (version != g_data_version) { // check for spurious wake up
cv.wait(lock); // this atomically unlocks the mutex and waits for a notify() call on another thread to happen
}
}
}
我是 C++ 开发的新手。我正在尝试 运行 个相互独立的无限函数。 问题陈述与此类似:
我尝试实现的方式是
#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <mutex>
int g_i = 0;
std::mutex g_i_mutex; // protects g_i
// increment g_i by 1
void increment_itr()
{
const std::lock_guard<std::mutex> lock(g_i_mutex);
g_i += 1;
}
void *fun(void *s)
{
std::string str;
str = (char *)s;
std::cout << str << " start\n";
while (1)
{
std::cout << str << " " << g_i << "\n";
if(g_i > 1000) break;
increment_itr();
}
pthread_exit(NULL);
std::cout << str << " end\n";
}
void *checker(void *s) {
while (1) {
if(g_i > 1000) {
std::cout<<"**********************\n";
std::cout << "checker: g_i == 100\n";
std::cout<<"**********************\n";
pthread_exit(NULL);
}
}
}
int main()
{
int itr = 0;
pthread_t threads[3];
pthread_attr_t attr;
void *status;
// Initialize and set thread joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int rc1 = pthread_create(&threads[0], &attr, fun, (void *)&"foo");
int rc2 = pthread_create(&threads[1], &attr, fun, (void *)&"bar");
int rc3 = pthread_create(&threads[2], &attr, checker, (void *)&"checker");
if (rc1 || rc2 || rc3)
{
std::cout << "Error:unable to create thread," << rc1 << rc2 << rc3 << std::endl;
exit(-1);
}
pthread_attr_destroy(&attr);
std::cout << "main func continues\n";
for (int i = 0; i < 3; i++)
{
rc1 = pthread_join(threads[i], &status);
if (rc1)
{
std::cout << "Error:unable to join," << rc1 << std::endl;
exit(-1);
}
std::cout << "Main: completed thread id :" << i;
std::cout << " exiting with status :" << status << std::endl;
}
std::cout << "main end\n";
return 0;
}
这行得通,但我想知道此实现是否是执行此操作的标准方法,或者可以用任何更好的方法来完成此操作?
您正确地在 increment_itr
中获取了锁,但是您的 fun
函数在未获取锁的情况下访问 g_i
。
改变这个:
void increment_itr()
{
const std::lock_guard<std::mutex> lock(g_i_mutex);
g_i += 1;
}
对此
int increment_itr()
{
std::lock_guard<std::mutex> lock(g_i_mutex); // the const wasn't actually needed
g_i = g_i + 1;
return g_i; // return the updated value of g_i
}
这不是线程安全的:
if(g_i > 1000) break; // access g_i without acquiring the lock
increment_itr();
这个更好:
if (increment_itr() > 1000) {
break;
}
checker
中需要类似的修复:
void *checker(void *s) {
while (1) {
int i;
{
std::lock_guard<std::mutex> lock(g_i_mutex);
i = g_i;
}
if(i > 1000) {
std::cout<<"**********************\n";
std::cout << "checker: g_i == 100\n";
std::cout<<"**********************\n";
break;
}
return NULL;
}
关于你的设计问题。这是根本问题。
您正在提议一个专用线程,该线程不断获取锁并对数据结构进行某种排序检查。如果满足某个条件,它会做一些额外的处理,比如写入数据库。如果数据结构(两个映射)中的任何内容都没有改变,那么在无限循环中旋转的线程将是一种浪费。相反,您只希望在某些情况发生变化时对 运行 进行完整性检查。您可以使用 condition variable 让检查器线程暂停,直到发生某些实际变化。
这是一个更好的设计。
uint64_t g_data_version = 0;
std::conditional_variable g_cv;
void *fun(void *s)
{
while (true) {
<< wait for data from the source >>
{
std::lock_guard<std::mutex> lock(g_i_mutex);
// update the data in the map while under a lock
// e.g. g_n++;
//
// increment the data version to signal a new revision has been made
g_data_version += 1;
}
// notify the checker thread that something has changed
g_cv.notify_all();
}
}
然后你的检查器函数只有在它 fun
发信号告诉它发生变化时才会唤醒。
void *checker(void *s) {
while (1) {
// lock the mutex
std::unique_lock<std::mutex> lock(g_i_mutex);
// do the data comparison check here
// now wait for the data version to change
uint64_t version = g_data_version;
while (version != g_data_version) { // check for spurious wake up
cv.wait(lock); // this atomically unlocks the mutex and waits for a notify() call on another thread to happen
}
}
}