单个 reader 多个具有 pthreads 和锁但没有提升的编写器

Single reader multiple writers with pthreads and locks and without boost

考虑下一段代码。

#include <iostream>
#include <vector>
#include <map>

using namespace std;

map<pthread_t,vector<int>> map_vec;
vector<pair<pthread_t ,int>> how_much_and_where;

pthread_cond_t CV = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* writer(void* args)
{
    while(*some condition*)
    {
        int howMuchPush = (rand() % 5) + 1;
        for (int i = 0; i < howMuchPush; ++i)
        {
            // WRITE
            map_vec[pthread_self()].push_back(rand() % 10);
        }

        how_much_and_where.push_back(make_pair(pthread_self(), howMuchPush));
        // Wake up the reader - there's something to read.
        pthread_cond_signal(&CV);
    }

    cout << "writer thread: " <<  pthread_self()  << endl;
    return nullptr;
}

void* reader(void* args) {

    pair<pthread_t, int> to_do;

    pthread_cond_wait(&CV, &mutex);
    while(*what condition??*)
    {
        to_do = how_much_and_where.front();
        how_much_and_where.erase(how_much_and_where.begin());

        // READ
        cout << to_do.first << " wrote " << endl;
        for (int i = 0; i < to_do.second; i++)
        {
            cout << map_vec[to_do.first][i] << endl;
        }

        // Done reading. Go to sleep.
        pthread_cond_wait(&CV, &mutex);
    }

    return nullptr;
}

//----------------------------------------------------------------------------//


int main()
{
    pthread_t threads[4];

    // Writers
    pthread_create(&threads[0], nullptr, writer, nullptr);
    pthread_create(&threads[1], nullptr, writer, nullptr);
    pthread_create(&threads[2], nullptr, writer, nullptr);
    // reader
    pthread_create(&threads[4], nullptr, reader, nullptr);


    pthread_join(threads[0], nullptr);
    pthread_join(threads[1], nullptr);
    pthread_join(threads[2], nullptr);
    pthread_join(threads[3], nullptr);

    return 0;
}

背景

每个writer都有自己的容器来写入数据。 并假设有一个 reader 知道写入器何时完成数据块的写入,以及该块的大小是多少(reader 有一个容器,写入器将成对的数据写入其中)。

问题

为了简化事情,我们应该将 general-purpose/reusable 生产者-消费者队列(或者我通常称之为 "blocking queue" )的实现与实际生产者和消费者的实施(不是 general-purpose/reusable - 它们特定于您的程序)。从设计的角度来看,这将使代码更加清晰和易于管理。

1。实现通用(可重用)阻塞队列

首先你应该实现一个 "blocking queue" 可以管理多个生产者和一个消费者。这个阻塞队列将包含处理 multithreading/synchronization 的代码,消费者线程可以使用它从多个生产者线程接收项目。这样的阻塞队列可以通过多种不同的方式实现(不仅是使用 mutex+cond 组合),具体取决于您是否有 1 个或多个消费者和 1 个或多个生产者(有时可以引入不同类型的 [平台特定] 优化当你只有 1 个消费者或 1 个生产者时)。如果需要,使用 mutex+cond 对的最简单队列实现可以自动处理多个生产者和多个消费者。

队列只有一个内部容器(它可以是一个非线程安全的 std::queue、向量或列表)来保存项目和一个相关的互斥锁+条件对来保护这个容器免受并发访问多个线程。队列必须提供两个操作:

  • produce(item):将一项放入队列并立即returns。伪代码如下所示:

    1. 锁定互斥体
    2. 将新项目添加到内部容器
    3. 通过 cond
    4. 发出信号
    5. 解锁互斥锁
    6. return
  • wait_and_get():如果队列中至少有一个项目,则它会删除最旧的项目并立即删除 returns,否则它会等待直到有人将项目放入具有 produce(item) 操作的队列。

    1. 锁定互斥锁
    2. 如果容器是空的:

      1. 等待条件 (pthread_cond_wait)
    3. 删除最旧的项目

    4. 解锁互斥锁
    5. return 删除的最旧项目

2。使用阻塞队列实现你的程序

现在您有了一个可重用的阻塞队列来构建我们可以实现生产者和消费者以及控制事物的主线程。

生产者

他们只是将一堆项目扔进队列(通过调用阻塞队列的 produce(item))然后他们退出。如果项目的生产不是计算量大或不需要等待大量 IO 操作,那么这将在您的示例程序中很快完成。为了模拟线程做繁重工作的真实世界场景,您可以执行以下操作:在每个生产者线程上,您只将 X(比如说 5)个项目放入队列,但在每个项目之间等待随机秒数1 到 3 秒之间。请注意,一段时间后,您的生产者线程在完成工作后会自行退出。

消费者

消费者有一个无限循环,在这个循环中它总是使用 wait_and_get() 从队列中获取下一个项目并以某种方式处理它。如果它是一个发出处理结束信号的特殊项目,那么它会跳出无限循环而不是处理该项目。伪代码:

  1. 无限循环:

    1. 从队列中获取下一项 (wait_and_get())
    2. 如果这是指示处理结束的特殊项,则跳出循环...
    3. 否则让我们处理这个项目

主线程

  1. 以任意顺序启动包括生产者和消费者在内的所有线程。
  2. 等待所有生产者线程完成(pthread_join() 个线程)。

    请记住,生产者在没有外部刺激的情况下会在一段时间后自行完成并退出。当您完成加入所有生产者时,这意味着每个生产者都已退出,因此没有人会再次调用队列的 produce(item) 操作。然而,队列可能仍有未处理的项目,消费者可能仍在处理这些项目。

  3. 将最后一个特殊 "end of processing" 项目放入消费者队列。

    当消费者处理完生产者生产的最后一个项目时,它仍会使用 wait_and_get() 向队列询问下一个项目 - 这可能会导致死锁,因为等待下一个项目永远不会到达.为了在主线程上实现这一点,我们将最后一个特殊项目放入队列中,向消费者发出处理结束的信号。请记住,我们的消费者实现包含对这个特殊项目的检查,以确定何时完成处理。重要的是,只有在生产者完成后(加入他们之后),才必须将此特殊项目放入主线程的队列中!

    如果你有多个消费者,那么将多个特殊的 "end of processing" 项目放入队列(每个消费者 1 个)比让队列更智能以能够处理多个消费者更容易,只有 1 [=141] =] 项。由于主线程协调了整个事情(线程创建、线程加入等),它确切地知道消费者的数量,因此很容易将相同数量的 "end of processing" 项放入队列。

  4. 等待消费者线程通过加入它来终止。

    将处理结束的特殊项目放入队列后,我们等待消费者线程处理剩余的项目(由生产者生产)以及我们的最后一个特殊项目(由主生产)"coordinator" thread) 要求消费者完成。我们通过 pthread_join() 在消费者线程中等待主线程。

补充说明:

  • 在我的线程系统实现中,阻塞队列的项目通常是指针 - 指向必须是 executed/processed 的 "job" 对象的指针。 (您可以将阻塞队列实现为模板 class,在这种情况下,阻塞队列的用户可以指定项的类型)。在我的例子中,很容易将一个特殊的 "end of processing" 项目放入队列中供消费者使用:为此我通常使用一个简单的 NULL 作业指针。在您的情况下,您将必须找出可以在队列中使用哪种特殊值来表示消费者处理结束。
  • 生产者可能有自己的队列和一大堆其他数据结构,他们可以使用这些数据结构"produce items",但消费者并不关心这些数据结构。消费者只关心通过自己的阻塞队列接收到的单个项目。如果生产者想从消费者那里得到一些东西,那么它必须通过队列向消费者发送一个项目("job")。阻塞队列实例属于消费者线程——它在任意线程和消费者线程之间提供了一种单向通信通道。甚至消费者线程本身也可以将一个项目放入自己的队列中(在某些情况下这很有用)。
  • pthread_cond_wait 文档说这个函数可以在没有实际信号的情况下唤醒(尽管我一生中从未见过一个由这个函数的虚假唤醒引起的错误)。为此,代码的 if container is empty then pthread_cond_wait 部分应替换为 while the container is empty pthread_cond_wait 但同样,这个虚假的唤醒事件可能是只存在于某些具有特定 linux 实现的架构上的湖水怪兽线程原语,因此您的代码可能会在台式机上运行而无需关心这个问题。