使用线程的多个缓冲区

multiple buffers using threads

我正在编写的多线程程序需要一些算法帮助。它基本上是 unix 中的 cp 命令,但是有一个读线程和一个写线程。我正在使用信号量进行线程同步。我有定义为

的缓冲区和线程数据结构
struct bufType {
    char buf[BUFFER_SIZE];
    int numBytes;
};

struct threadData {
    int fd;
    bufType buf;
};

和一个bufType的全局数组。我的主要代码是

int main(int argc, const char * argv[])
{
    int in, out;
    pthread_t Producer, Consumer;
    threadData producerData, consumerData;

    if (argc != 3)
    {
        cout << "Error: incorrect number of params" << endl;
        exit(0);
    }
    if ((in = open(argv[1], O_RDONLY, 0666)) == -1)
    {
        cout << "Error: cannot open input file" << endl;
        exit(0);
    }
    if ((out = open(argv[2], O_WRONLY | O_CREAT, 0666)) == -1)
    {
        cout << "Cannot create output file" << endl;
        exit(0);
    }

    sem_init(&sem_empty, 0, NUM_BUFFERS);
    sem_init(&sem_full, 0, 0);

    pthread_create (&Producer, NULL, read_thread, (void *) &producerData);
    pthread_create (&Consumer, NULL, write_thread, (void *) &consumerData);

    pthread_join(Producer, NULL);
    pthread_join(Consumer, NULL);

    return 0;
}

以及读写线程:

void *read_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    while((thread_data->buf.numBytes = slow_read(thread_data->fd, thread_data->buf.buf, BUFFER_SIZE)) != 0)
    {
        sem_post(&sem_full);
        sem_wait(&sem_empty);
    }

    pthread_exit(0);
}

void *write_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    sem_wait(&sem_full);
    slow_write(thread_data->fd, thread_data->buf.buf, thread_data->buf.numBytes);
    sem_post(&sem_empty);

    pthread_exit(0);
}

所以我的问题是在主线程中为我的 threadData 变量分配什么,以及在读写线程中为我的信号量逻辑分配什么。感谢您提供的任何帮助

您可以使用公共缓冲池,循环数组或 linked 列表。这是一个 link 到 Windows 示例的 zip 压缩包,它与您所问的类似,使用 linked 列表作为线程间消息传递系统的一部分来缓冲数据。除了互斥量、信号量和写线程的创建之外,这些函数都小而简单。 mtcopy.zip.

作为一个 windows 不使用文件描述符的人,我可能对 in 和 out 有误,但我认为这需要在您的 main 中完成才能设置 threadData 结构。

producerData.fd = in;
consumerData.fd = out;

然后为两个结构声明一个 bufType 类型的单一对象。例如,将 threadData 的定义更改为

struct threadData {
    int fd;
    bufType* buf;
};

然后在您的 Main 中,您写

bufType buffer;
producerData.buf = &buffer;
consumerData.buf = &buffer;

然后两个线程将使用公共缓冲区。否则您将写入生产者数据缓冲区,但消费者数据缓冲区将保持为空(这是您的编写器线程正在寻找数据的地方)

那么你需要改变你的信令逻辑。现在您的程序无法接受超过 BUFFER_SIZE 的输入,因为您的写入线程只会写入一次。它需要有一个循环。然后你需要一些机制来通知 writer thread 不再发送数据。例如你可以这样做

void *read_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    while((thread_data->buf->numBytes = slow_read(thread_data->fd, thread_data->buf->buf, BUFFER_SIZE)) > 0)
    {
        sem_post(&sem_full);
        sem_wait(&sem_empty);
    }
    sem_post(&sem_full); // Note that thread_data->buf->numBytes <= 0 now

    pthread_exit(0);
}

void *write_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;


    sem_wait(&sem_full);
    while (thread_data->buf->numBytes > 0)
    {
        slow_write(thread_data->fd, thread_data->buf->buf, thread_data->buf->numBytes);
        sem_post(&sem_empty);
        sem_wait(&sem_full);
    }
    pthread_exit(0);
}

希望不要再出现错误,没有测试解决方案。但是这个概念应该是你所要求的。