使用线程的多个缓冲区
multiple buffers using threads
我正在编写的多线程程序需要一些算法帮助。它基本上是 unix 中的 cp 命令,但是有一个读线程和一个写线程。我正在使用信号量进行线程同步。我有定义为
的缓冲区和线程数据结构
struct bufType {
char buf[BUFFER_SIZE];
int numBytes;
};
struct threadData {
int fd;
bufType buf;
};
和一个bufType的全局数组。我的主要代码是
int main(int argc, const char * argv[])
{
int in, out;
pthread_t Producer, Consumer;
threadData producerData, consumerData;
if (argc != 3)
{
cout << "Error: incorrect number of params" << endl;
exit(0);
}
if ((in = open(argv[1], O_RDONLY, 0666)) == -1)
{
cout << "Error: cannot open input file" << endl;
exit(0);
}
if ((out = open(argv[2], O_WRONLY | O_CREAT, 0666)) == -1)
{
cout << "Cannot create output file" << endl;
exit(0);
}
sem_init(&sem_empty, 0, NUM_BUFFERS);
sem_init(&sem_full, 0, 0);
pthread_create (&Producer, NULL, read_thread, (void *) &producerData);
pthread_create (&Consumer, NULL, write_thread, (void *) &consumerData);
pthread_join(Producer, NULL);
pthread_join(Consumer, NULL);
return 0;
}
以及读写线程:
void *read_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
while((thread_data->buf.numBytes = slow_read(thread_data->fd, thread_data->buf.buf, BUFFER_SIZE)) != 0)
{
sem_post(&sem_full);
sem_wait(&sem_empty);
}
pthread_exit(0);
}
void *write_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
sem_wait(&sem_full);
slow_write(thread_data->fd, thread_data->buf.buf, thread_data->buf.numBytes);
sem_post(&sem_empty);
pthread_exit(0);
}
所以我的问题是在主线程中为我的 threadData 变量分配什么,以及在读写线程中为我的信号量逻辑分配什么。感谢您提供的任何帮助
您可以使用公共缓冲池,循环数组或 linked 列表。这是一个 link 到 Windows 示例的 zip 压缩包,它与您所问的类似,使用 linked 列表作为线程间消息传递系统的一部分来缓冲数据。除了互斥量、信号量和写线程的创建之外,这些函数都小而简单。 mtcopy.zip.
作为一个 windows 不使用文件描述符的人,我可能对 in 和 out 有误,但我认为这需要在您的 main 中完成才能设置 threadData 结构。
producerData.fd = in;
consumerData.fd = out;
然后为两个结构声明一个 bufType 类型的单一对象。例如,将 threadData 的定义更改为
struct threadData {
int fd;
bufType* buf;
};
然后在您的 Main 中,您写
bufType buffer;
producerData.buf = &buffer;
consumerData.buf = &buffer;
然后两个线程将使用公共缓冲区。否则您将写入生产者数据缓冲区,但消费者数据缓冲区将保持为空(这是您的编写器线程正在寻找数据的地方)
那么你需要改变你的信令逻辑。现在您的程序无法接受超过 BUFFER_SIZE
的输入,因为您的写入线程只会写入一次。它需要有一个循环。然后你需要一些机制来通知 writer thread 不再发送数据。例如你可以这样做
void *read_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
while((thread_data->buf->numBytes = slow_read(thread_data->fd, thread_data->buf->buf, BUFFER_SIZE)) > 0)
{
sem_post(&sem_full);
sem_wait(&sem_empty);
}
sem_post(&sem_full); // Note that thread_data->buf->numBytes <= 0 now
pthread_exit(0);
}
void *write_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
sem_wait(&sem_full);
while (thread_data->buf->numBytes > 0)
{
slow_write(thread_data->fd, thread_data->buf->buf, thread_data->buf->numBytes);
sem_post(&sem_empty);
sem_wait(&sem_full);
}
pthread_exit(0);
}
希望不要再出现错误,没有测试解决方案。但是这个概念应该是你所要求的。
我正在编写的多线程程序需要一些算法帮助。它基本上是 unix 中的 cp 命令,但是有一个读线程和一个写线程。我正在使用信号量进行线程同步。我有定义为
的缓冲区和线程数据结构struct bufType {
char buf[BUFFER_SIZE];
int numBytes;
};
struct threadData {
int fd;
bufType buf;
};
和一个bufType的全局数组。我的主要代码是
int main(int argc, const char * argv[])
{
int in, out;
pthread_t Producer, Consumer;
threadData producerData, consumerData;
if (argc != 3)
{
cout << "Error: incorrect number of params" << endl;
exit(0);
}
if ((in = open(argv[1], O_RDONLY, 0666)) == -1)
{
cout << "Error: cannot open input file" << endl;
exit(0);
}
if ((out = open(argv[2], O_WRONLY | O_CREAT, 0666)) == -1)
{
cout << "Cannot create output file" << endl;
exit(0);
}
sem_init(&sem_empty, 0, NUM_BUFFERS);
sem_init(&sem_full, 0, 0);
pthread_create (&Producer, NULL, read_thread, (void *) &producerData);
pthread_create (&Consumer, NULL, write_thread, (void *) &consumerData);
pthread_join(Producer, NULL);
pthread_join(Consumer, NULL);
return 0;
}
以及读写线程:
void *read_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
while((thread_data->buf.numBytes = slow_read(thread_data->fd, thread_data->buf.buf, BUFFER_SIZE)) != 0)
{
sem_post(&sem_full);
sem_wait(&sem_empty);
}
pthread_exit(0);
}
void *write_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
sem_wait(&sem_full);
slow_write(thread_data->fd, thread_data->buf.buf, thread_data->buf.numBytes);
sem_post(&sem_empty);
pthread_exit(0);
}
所以我的问题是在主线程中为我的 threadData 变量分配什么,以及在读写线程中为我的信号量逻辑分配什么。感谢您提供的任何帮助
您可以使用公共缓冲池,循环数组或 linked 列表。这是一个 link 到 Windows 示例的 zip 压缩包,它与您所问的类似,使用 linked 列表作为线程间消息传递系统的一部分来缓冲数据。除了互斥量、信号量和写线程的创建之外,这些函数都小而简单。 mtcopy.zip.
作为一个 windows 不使用文件描述符的人,我可能对 in 和 out 有误,但我认为这需要在您的 main 中完成才能设置 threadData 结构。
producerData.fd = in;
consumerData.fd = out;
然后为两个结构声明一个 bufType 类型的单一对象。例如,将 threadData 的定义更改为
struct threadData {
int fd;
bufType* buf;
};
然后在您的 Main 中,您写
bufType buffer;
producerData.buf = &buffer;
consumerData.buf = &buffer;
然后两个线程将使用公共缓冲区。否则您将写入生产者数据缓冲区,但消费者数据缓冲区将保持为空(这是您的编写器线程正在寻找数据的地方)
那么你需要改变你的信令逻辑。现在您的程序无法接受超过 BUFFER_SIZE
的输入,因为您的写入线程只会写入一次。它需要有一个循环。然后你需要一些机制来通知 writer thread 不再发送数据。例如你可以这样做
void *read_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
while((thread_data->buf->numBytes = slow_read(thread_data->fd, thread_data->buf->buf, BUFFER_SIZE)) > 0)
{
sem_post(&sem_full);
sem_wait(&sem_empty);
}
sem_post(&sem_full); // Note that thread_data->buf->numBytes <= 0 now
pthread_exit(0);
}
void *write_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
sem_wait(&sem_full);
while (thread_data->buf->numBytes > 0)
{
slow_write(thread_data->fd, thread_data->buf->buf, thread_data->buf->numBytes);
sem_post(&sem_empty);
sem_wait(&sem_full);
}
pthread_exit(0);
}
希望不要再出现错误,没有测试解决方案。但是这个概念应该是你所要求的。