使用 pthread 库的线程间同步

Synchronization between threads using pthread library

比如我们有5条数据。(假设我们有很多space,不同版本的数据不会互相重叠。)

数据 0、数据 1、数据 2、数据 3、数据 4。

我们有 3 个线程(少于 5 个)处理这些数据。

处理 DATA1(版本 0)的线程 1 已从 DATA0(版本 0)和 DATA2(版本 0)访问了一些数据,并创建了 DATA1(版本 1)。

处理 DATA3(版本 0)的线程 2 已从 DATA2(版本 0)和 DATA4(版本 0)访问了一些数据,并创建了 DATA3(版本 1)。

处理 DATA2(版本 0)的线程 3 已从 DATA1(版本 0)和 DATA3(版本 0)访问了一些数据,并创建了 DATA2(版本 1)。

现在,如果线程 1 先完成。它有多种选择,它可以在 DATA0(创建 DATA0 版本 1)上工作,因为 DATA1(版本 0)和 DATA4(版本 0)可用(假设 DATA0 和 DATA4 是邻居)。如果它发现 DATA1(version1) 和 DATA3(version1) 都可用并创建 DATA2(version 2),它也可以在 DATA 2 上工作。

要求是下一个版本的数据可以在它的邻居数据准备好后处理(在 1 个较低的版本中)。

最后,我希望当所有数据到达版本 10 时,所有线程都退出。

问题:如何使用pthread库实现这个方案

注意:我想同时拥有不同版本的数据,因此创建屏障并确保所有数据都达到同一版本不是一种选择。

让我们讨论实施。要存储所有版本 (0~10),我们需要 5*11*sizeof(data) space。让我们创建两个大小为 5 x 11 的数组。第一个数组是 DATA,使得 DATA[i][j] 是数据 i 的第 j 个版本。第二个数组是一个'Access Matrix' - A,它表示索引的状态,它可以是:

  • 未开始
  • 进行中
  • 完成

算法:每个线程将在矩阵中搜索索引 [i][j],使得索引 [i-1][j-1] 和 [i+1][j-1] 是 'Completed'。它会在处理它时将 A[i][j] 设置为 'In Progress' 。如果i=0,i-1指的是n-1,如果i=n-1,i+1指的是0。(像一个循环队列)。当最后一列中的所有条目都是 'Completed' 时,线程终止。否则它搜索未完成的新数据。

使用pthread库实现:

重要变量:互斥量、条件变量。

pthread_mutex_t互斥=PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t condvar= PTHREAD_COND_INITIALIZER;

互斥锁是一个'lock'。当我们需要进行操作atomic时,我们会使用它。原子操作是指需要在不中断执行的情况下一步完成的操作。 'condvar' 是一个条件变量。使用它,线程可以休眠直到达到条件,当达到条件时,线程被唤醒。这避免了使用循环忙等待

这里,我们的atomic操作正在更新A。原因:如果线程同时更新A,可能会导致竞争条件,例如超过1个线程在处理一个Data in并行。

为了实现这一点,我们搜索并在 内部设置 A 锁。设置 A 后,我们释放锁并处理数据。但是,如果找不到可以处理的可用数据,我们将等待条件变量 - condvar。当我们在 condvar 上调用 wait 时,我们也传递了互斥量。在锁内时,等待函数释放互斥锁并等待条件变量发出信号。一旦收到信号,它就需要锁定并继续执行。等待进程处于休眠状态,因此不会浪费 CPU 时间。

每当任何线程完成对一条数据的处理时,它可能会准备 1 个或多个其他样本以供处理。因此,在一个线程完成工作后,它会通知所有其他线程在继续算法之前检查 'workable' 数据。伪代码如下:

阅读评论和函数名称。他们详细描述了 pthread 库的工作。虽然使用 gcc 编译时添加 -lpthread 标志并查找这些函数的手册页以获取库的更多详细信息,但这些功能已经足够了。

void thread(void)
{
    //Note: If there are various threads in the line pthread_mutex_lock(&mutex)
    // each will wait till the lock is released and acquired. Until then it will sleep.
    pthread_mutex_lock(&mutex); //Do the searching inside the lock
    while(lastColumnNotDone){   //This will avoid previously searched indices being updated
    //Search for a workable index
        if(found)
        {   //As A has been updated and set to in progress, no need to hold lock. As we start work on the thread we release the lock so other process might use it.
            pthread_mutex_unlock(&mutex); //Note:
            //WORK ON DATA
            pthread_mutex_lock(&mutex); //Restore lock to in order to continue thread's execution safely.
            pthread_cond_broadcast(&condvar); //Sends a wake up signal to all threads which are waiting for the conditional variable 'condvar'.
        }
        else //No executable data found
            pthread_cond_wait(&condvar,&mutex); //While waiting, we pass the address of mutex as second parameter to wait function. This releases the lock on mutex while this function is waiting and tries to reacquire it once condvar is signaled.
    }
    pthread_mutex_unlock(&mutex);
}

在while循环条件下搜索和检查是否所有数据都完成可以优化,但这是一个不同的算法问题。这里的关键思想是使用 pthread 库和线程概念。

  • A是一个公共访问矩阵。不要在锁外更新它。
  • 在检查与 A 有关的任何事情时,例如查找进程或检查所有数据是否已完成,必须持有锁。否则 A 可以在一个线程读取它的同时被另一个线程更改。
  • 我们使用函数 pthread_mutex_lock 和 pthread_mutex_unlock 获取和释放锁。请记住,这些函数采用 pointers 互斥量而不是它的值。它是一个需要访问和更新的变量。
    • 避免长时间持有锁。这将导致线程等待很长时间才能满足较小的访问需求。
    • 调用wait时,确保持有锁。 Wait 解锁在等待期间作为第二个参数传递的互斥量。收到唤醒信号后,它会再次尝试获取锁。