访问 pthread shared std:map 而没有数据竞争

Access pthread shared std:map without data race

我的场景是有一个主线程和几十个工作线程。工作线程将处理来自不同端口的传入消息。

我想要做的是让主线程和工作线程共享同一个映射,工作线程将数据保存到映射中(在不同的桶中)。并且主线程定期grep地图内容。

代码如下:

struct cStruct
{
    std::map<string::string> map1; 
    pthread_mutex_t mutex1;
    pthread_mutex_t mutex2;  
};

int main(){

    struct cStruct cStruct1;   
    while (condition){
        pthread_t th;
        int th_rc=pthread_create(&th,NULL,&task,(void *) &cStruct1);
    }

}

void* task(void* arg){
    struct cStruct cs = * (struct cStruct*) arg;

    while (coming data){
        if (main_thread_work){
            pthread_cond_wait(&count_cond, &cs.mutex1)
        }  

        pthread_mutex_lock(&cs.mutex1);
        // add a new bucket to the map
        cs.map1(thread_identifier)=processed_value;
        pthread_mutex_unlock(&cs.mutex1);

    }

void* main_thread_task(void* arg){

    sleep(sleep_time);
    main_thread_work = true;
    pthread_mutex_lock(&cs.mutex1);
    // main_thread reads the std::map
    main_thread_work = false;
    pthread_cond_broadcast(&count_cond, &cs.mutex1)
    pthread_mutex_unlock(&cs.mutex1);    
}

我的问题是:

对于地图大小的改变,我应该使用锁来保护地图。 但是对于某个key update的map,是否可以让不同线程同时修改map呢? (假设不会同时访问两个相同的 map 桶)

对于主线程 grep 地图,我想在主线程 grep 地图内容时使用条件等待来保持所有工作线程,然后执行 pthread_cond_broadcast 唤醒。问题是,如果主线程开始工作时工作线程正在更新地图,则会出现数据竞争。

请分享一些想法来帮助我改进我的设计。

编辑 1: 添加 main_thread_task()。 我想避免的事情是工作线程到达 pthread_cond_wait "after" pthread_cond_broadcast 并且逻辑出错。

所以我在主线程广播工作线程之前伪造了 main_thread_work。

您应该在每次访问地图(在您的情况下)时使用 mutex 锁定机制,而不仅仅是在添加新的 'bucket' 时。如果 T1 尝试在 T2 插入新桶时向映射写入一些值,则 T1 使用的 pointer/iterator 将变为无效。

关于pthread_cond_wait。如果其他线程所做的唯一事情只是修改地图,它可能会完成这项工作。如果他们执行其他计算或处理一些非共享数据,最好使用相同的 mutex 来保护对地图的访问并让其他线程完成他们的工作,这可能与共享地图无关.

while (coming data){
    if (main_thread_work){
        pthread_cond_wait(&count_cond, &cs.mutex1)
    }  

    pthread_mutex_lock(&cs.mutex1);

这显然不对。你不能检查 main_thread_work 除非你持有保护它的锁。对 pthread_cond_wait 的调用如何释放它不持有的锁?!

这应该是这样的:

void* task(void* arg){
    struct cStruct cs = * (struct cStruct*) arg;

    // Acquire the lock so we can check shared state
    pthread_mutex_lock(&cs.mutex1);

    // Wait until the shared state is what we need it to be
    while (main_thread_work)
        pthread_cond_wait(&count_cond, &cs.mutex1)

    // Do whatever it is we're supposed to do when the
    // shared state is in this state
    cs.map1(thread_identifier)=processed_value;

    // release the lock
    pthread_mutex_unlock(&cs.mutex1);
}