访问 pthread shared std:map 而没有数据竞争
Access pthread shared std:map without data race
我的场景是有一个主线程和几十个工作线程。工作线程将处理来自不同端口的传入消息。
我想要做的是让主线程和工作线程共享同一个映射,工作线程将数据保存到映射中(在不同的桶中)。并且主线程定期grep地图内容。
代码如下:
struct cStruct
{
std::map<string::string> map1;
pthread_mutex_t mutex1;
pthread_mutex_t mutex2;
};
int main(){
struct cStruct cStruct1;
while (condition){
pthread_t th;
int th_rc=pthread_create(&th,NULL,&task,(void *) &cStruct1);
}
}
void* task(void* arg){
struct cStruct cs = * (struct cStruct*) arg;
while (coming data){
if (main_thread_work){
pthread_cond_wait(&count_cond, &cs.mutex1)
}
pthread_mutex_lock(&cs.mutex1);
// add a new bucket to the map
cs.map1(thread_identifier)=processed_value;
pthread_mutex_unlock(&cs.mutex1);
}
void* main_thread_task(void* arg){
sleep(sleep_time);
main_thread_work = true;
pthread_mutex_lock(&cs.mutex1);
// main_thread reads the std::map
main_thread_work = false;
pthread_cond_broadcast(&count_cond, &cs.mutex1)
pthread_mutex_unlock(&cs.mutex1);
}
我的问题是:
对于地图大小的改变,我应该使用锁来保护地图。
但是对于某个key update的map,是否可以让不同线程同时修改map呢? (假设不会同时访问两个相同的 map 桶)
对于主线程 grep 地图,我想在主线程 grep 地图内容时使用条件等待来保持所有工作线程,然后执行 pthread_cond_broadcast 唤醒。问题是,如果主线程开始工作时工作线程正在更新地图,则会出现数据竞争。
请分享一些想法来帮助我改进我的设计。
编辑 1:
添加 main_thread_task()。
我想避免的事情是工作线程到达 pthread_cond_wait "after" pthread_cond_broadcast 并且逻辑出错。
所以我在主线程广播工作线程之前伪造了 main_thread_work。
您应该在每次访问地图(在您的情况下)时使用 mutex
锁定机制,而不仅仅是在添加新的 'bucket' 时。如果 T1 尝试在 T2 插入新桶时向映射写入一些值,则 T1 使用的 pointer/iterator 将变为无效。
关于pthread_cond_wait
。如果其他线程所做的唯一事情只是修改地图,它可能会完成这项工作。如果他们执行其他计算或处理一些非共享数据,最好使用相同的 mutex
来保护对地图的访问并让其他线程完成他们的工作,这可能与共享地图无关.
while (coming data){
if (main_thread_work){
pthread_cond_wait(&count_cond, &cs.mutex1)
}
pthread_mutex_lock(&cs.mutex1);
这显然不对。你不能检查 main_thread_work
除非你持有保护它的锁。对 pthread_cond_wait
的调用如何释放它不持有的锁?!
这应该是这样的:
void* task(void* arg){
struct cStruct cs = * (struct cStruct*) arg;
// Acquire the lock so we can check shared state
pthread_mutex_lock(&cs.mutex1);
// Wait until the shared state is what we need it to be
while (main_thread_work)
pthread_cond_wait(&count_cond, &cs.mutex1)
// Do whatever it is we're supposed to do when the
// shared state is in this state
cs.map1(thread_identifier)=processed_value;
// release the lock
pthread_mutex_unlock(&cs.mutex1);
}
我的场景是有一个主线程和几十个工作线程。工作线程将处理来自不同端口的传入消息。
我想要做的是让主线程和工作线程共享同一个映射,工作线程将数据保存到映射中(在不同的桶中)。并且主线程定期grep地图内容。
代码如下:
struct cStruct
{
std::map<string::string> map1;
pthread_mutex_t mutex1;
pthread_mutex_t mutex2;
};
int main(){
struct cStruct cStruct1;
while (condition){
pthread_t th;
int th_rc=pthread_create(&th,NULL,&task,(void *) &cStruct1);
}
}
void* task(void* arg){
struct cStruct cs = * (struct cStruct*) arg;
while (coming data){
if (main_thread_work){
pthread_cond_wait(&count_cond, &cs.mutex1)
}
pthread_mutex_lock(&cs.mutex1);
// add a new bucket to the map
cs.map1(thread_identifier)=processed_value;
pthread_mutex_unlock(&cs.mutex1);
}
void* main_thread_task(void* arg){
sleep(sleep_time);
main_thread_work = true;
pthread_mutex_lock(&cs.mutex1);
// main_thread reads the std::map
main_thread_work = false;
pthread_cond_broadcast(&count_cond, &cs.mutex1)
pthread_mutex_unlock(&cs.mutex1);
}
我的问题是:
对于地图大小的改变,我应该使用锁来保护地图。 但是对于某个key update的map,是否可以让不同线程同时修改map呢? (假设不会同时访问两个相同的 map 桶)
对于主线程 grep 地图,我想在主线程 grep 地图内容时使用条件等待来保持所有工作线程,然后执行 pthread_cond_broadcast 唤醒。问题是,如果主线程开始工作时工作线程正在更新地图,则会出现数据竞争。
请分享一些想法来帮助我改进我的设计。
编辑 1: 添加 main_thread_task()。 我想避免的事情是工作线程到达 pthread_cond_wait "after" pthread_cond_broadcast 并且逻辑出错。
所以我在主线程广播工作线程之前伪造了 main_thread_work。
您应该在每次访问地图(在您的情况下)时使用 mutex
锁定机制,而不仅仅是在添加新的 'bucket' 时。如果 T1 尝试在 T2 插入新桶时向映射写入一些值,则 T1 使用的 pointer/iterator 将变为无效。
关于pthread_cond_wait
。如果其他线程所做的唯一事情只是修改地图,它可能会完成这项工作。如果他们执行其他计算或处理一些非共享数据,最好使用相同的 mutex
来保护对地图的访问并让其他线程完成他们的工作,这可能与共享地图无关.
while (coming data){
if (main_thread_work){
pthread_cond_wait(&count_cond, &cs.mutex1)
}
pthread_mutex_lock(&cs.mutex1);
这显然不对。你不能检查 main_thread_work
除非你持有保护它的锁。对 pthread_cond_wait
的调用如何释放它不持有的锁?!
这应该是这样的:
void* task(void* arg){
struct cStruct cs = * (struct cStruct*) arg;
// Acquire the lock so we can check shared state
pthread_mutex_lock(&cs.mutex1);
// Wait until the shared state is what we need it to be
while (main_thread_work)
pthread_cond_wait(&count_cond, &cs.mutex1)
// Do whatever it is we're supposed to do when the
// shared state is in this state
cs.map1(thread_identifier)=processed_value;
// release the lock
pthread_mutex_unlock(&cs.mutex1);
}