在多线程生产者和消费者应用程序中读取时发生访问冲突
Access violation while reading in a multithreaded producer and consumer application
我制作了一个多线程 producer/consumer 应用程序,我已经为此苦苦挣扎了几天。生产者将斐波那契数放入循环缓冲区,消费者从缓冲区中取出数字,直到达到指定的限制。
我已经用互斥锁(互斥)保护了循环缓冲区,它应该可以防止多个线程访问相同的数据。我还设置了一些事件,以防止生产者溢出缓冲区,并防止消费者在缓冲区为空时访问缓冲区。
虽然我这么说,但我仍然注意到消费者在缓冲区为空时正在访问该缓冲区。这就是为什么我在消费者线程中添加了一个中断(我不太明白为什么这是必要的)。
我偶尔也会收到 "Access violation reading location" 错误,这是我无法理解的。我注意到这些在创建更多线程时更频繁地发生,几乎总是发生。我认为这些可能会发生,因为消费者试图在不存在的位置读取缓冲区,但我发现情况并非如此。
可能是什么导致了我的问题?有没有可能多线程在Mutex上传递WaitForSingleObject?
这是Fibonacci.c
#include "Fibonacci.h"
#define MINIMUM 1
#define MAXIMUM 5
HANDLE eBufferNotFull;
HANDLE eBufferNotEmpty;
HANDLE fiboMutex;
HANDLE bufferMutex;
CircularBuffer *buffer;
Fibonumbers numbers;
int main(void) {
uint8_t amountOfProducers, amountOfConsumers, size;
ThreadStruct consumerInfo, producerInfo;
setValue("The amount of producers", &amountOfProducers, MINIMUM, MAXIMUM);
setValue("The amount of consumers", &amountOfConsumers, MINIMUM, MAXIMUM);
setValue("The size of the buffer", &size, 1, 80);
resetFibo(&numbers);
setValue("The sleeping time for producers", &producerInfo.sleep, 0, 10000);
setValue("The sleeping time for consumers", &consumerInfo.sleep, 0, 10000);
setValue("The limit for the fibonumber", &producerInfo.limit, 0, 35000000000000000);
consumerInfo.limit = producerInfo.limit;
HANDLE hProducer[MAXIMUM];
DWORD dwProducer[MAXIMUM];
HANDLE hConsumer[MAXIMUM];
DWORD dwConsumer[MAXIMUM];
buffer = createBuffer(size);
/* Create the Mutexes */
fiboMutex = CreateMutex(NULL, FALSE, NULL);
bufferMutex = CreateMutex(NULL, FALSE, NULL);
/* Create the Events */
eBufferNotFull = CreateEvent(NULL, FALSE, TRUE, TEXT("buffer_niet_vol"));
eBufferNotEmpty = CreateEvent(NULL, FALSE, FALSE, TEXT("buffer_niet_leeg"));
/* Create the producer threads*/
for (int i = 0; i < amountOfProducers; ++i) {
hProducer[i] = CreateThread(NULL, // No security
0, // Use default stack size
(LPTHREAD_START_ROUTINE)producer,
&producerInfo, // Thread argument
0, // Child became running
(LPDWORD)&dwProducer[i]); // Child id
}
/* Create the consumer threads*/
for (int i = 0; i < amountOfConsumers; ++i) {
hConsumer[i] = CreateThread(NULL, // No security
0, // Use default stack size
(LPTHREAD_START_ROUTINE)consumer,
&consumerInfo, // Thread argument
0, // Child became running
(LPDWORD)&dwConsumer[i]); // Child id
}
WaitForMultipleObjects(amountOfProducers, hProducer, true, INFINITE);
WaitForMultipleObjects(amountOfConsumers, hConsumer, true, INFINITE);
deleteBuffer(buffer);
return (EXIT_SUCCESS);
}
DWORD WINAPI producer(LPVOID lpParameter) {
ThreadStruct *info = (ThreadStruct *)lpParameter;
while (true) {
Sleep(info->sleep);
WaitForSingleObject(fiboMutex, INFINITE); // Lock the fibonumber struct
createNewFibonumber();
if (numbers.currentFibo > info->limit) {
ReleaseMutex(fiboMutex); // Release the fibonumber struct
ExitThread(EXIT_SUCCESS);
}
WaitForSingleObject(eBufferNotFull, INFINITE);
WaitForSingleObject(bufferMutex, INFINITE);
putElement(buffer, numbers.currentFibo);
ReleaseMutex(fiboMutex); // Release the fibonumber struct
ReleaseMutex(bufferMutex);
SetEvent(eBufferNotEmpty);
}
}
DWORD WINAPI consumer(LPVOID lpParameter) {
ThreadStruct *info = (ThreadStruct *)lpParameter;
while (true) {
Sleep(info->sleep);
WaitForSingleObject(eBufferNotEmpty, INFINITE);
WaitForSingleObject(bufferMutex, INFINITE);
printf(" fibogetal: %i \n", getElement(buffer));
ReleaseMutex(bufferMutex);
SetEvent(eBufferNotFull);
}
ExitThread(EXIT_SUCCESS);
}
void createNewFibonumber() {
uint64_t i = numbers.currentFibo;
numbers.currentFibo += numbers.lastFibo;
numbers.lastFibo = i;
}
void resetFibo(Fibonumbers *numbers) {
numbers->lastFibo = 0;
numbers->currentFibo = 1;
}
void setValue(char *text, void *intpointer, uint64_t minimum, uint64_t maximum) {
printf("%s\n", text);
do {
*(uint64_t *)intpointer = 0;
printf("Enter a value from %lli up to %lli : ", minimum, maximum);
scanf_s("%lli", intpointer);
} while (*(uint64_t *)intpointer < minimum || *(uint64_t *)intpointer > maximum);
}
Fibonacci.h
#include <stdio.h>
#include <stdint.h>
#include <conio.h>
#include <Windows.h>
#include "Buffer.h"
typedef struct {
uint64_t currentFibo;
uint64_t lastFibo;
} Fibonumbers;
typedef struct {
uint64_t limit;
uint16_t sleep;
} ThreadStruct;
/*
*
*/
DWORD WINAPI producer(LPVOID lpParameter);
/*
*
*/
DWORD WINAPI consumer(LPVOID lpParameter);
/*
*
*/
void createNewFibonumber();
/*
*
*/
void resetFibo(Fibonumbers *numbers);
/*
*
*/
void setValue(char *text, void *intpointer, uint64_t minimum, uint64_t maximum);
还有 Buffer.c
#include "Buffer.h"
CircularBuffer *createBuffer(uint8_t size) {
CircularBuffer *buffer = (CircularBuffer *)calloc(1, sizeof(CircularBuffer));
buffer->size = size;
buffer->count = 0;
buffer->start = 0;
buffer->end = 0;
buffer->buffer = (uint64_t *)calloc(buffer->size, sizeof(uint64_t));
return buffer;
}
void deleteBuffer(CircularBuffer *buffer) {
if (buffer) {
free(buffer->buffer);
free(buffer);
}
}
void putElement(CircularBuffer *buffer, uint64_t element) {
buffer->count++;
buffer->buffer[buffer->start] = element;
buffer->start++;
if (buffer->start == buffer->size) {
buffer->start = 0;
}
printf("put: %i items in buffer.\n", buffer->count);
}
uint64_t getElement(CircularBuffer *buffer) {
buffer->count--;
uint64_t value = buffer->buffer[buffer->end];
buffer->end++;
if (buffer->end == buffer->size) {
buffer->end = 0;
}
printf(" get: %i items in buffer.\n", buffer->count);
return value;
}
bool isBufferFull(CircularBuffer *buffer) {
return (buffer->count == buffer->size);
}
bool isBufferEmpty(CircularBuffer *buffer) {
return (buffer->count == 0);
}
Buffer.h
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct {
uint64_t *buffer;
uint8_t size;
uint8_t count;
uint8_t start;
uint8_t end;
} CircularBuffer;
CircularBuffer *createBuffer(uint8_t size);
void deleteBuffer(CircularBuffer *buffer);
void putElement(CircularBuffer *buffer, uint64_t element);
uint64_t getElement(CircularBuffer *buffer);
bool isBufferFull(CircularBuffer *buffer);
bool isBufferEmpty(CircularBuffer *buffer);
如果有人还想查看头文件,请说明。
编辑:我更新了代码,现在可以正常使用了。
edit2: 当我在调试模式下构建程序时,程序可以运行,但在发布模式下构建时,它似乎没有启动线程。
分配大小错误
我发现这一行非常可疑:
buffer->buffer = (uint64_t *)calloc(buffer->size, sizeof(unsigned int));
如果buffer->buffer
是uint64_t
的数组,为什么要用sizeof(unsigned int)
分配呢?我不知道这是否是您的问题,但这至少是应该解决的一件事。
我制作了一个多线程 producer/consumer 应用程序,我已经为此苦苦挣扎了几天。生产者将斐波那契数放入循环缓冲区,消费者从缓冲区中取出数字,直到达到指定的限制。
我已经用互斥锁(互斥)保护了循环缓冲区,它应该可以防止多个线程访问相同的数据。我还设置了一些事件,以防止生产者溢出缓冲区,并防止消费者在缓冲区为空时访问缓冲区。
虽然我这么说,但我仍然注意到消费者在缓冲区为空时正在访问该缓冲区。这就是为什么我在消费者线程中添加了一个中断(我不太明白为什么这是必要的)。
我偶尔也会收到 "Access violation reading location" 错误,这是我无法理解的。我注意到这些在创建更多线程时更频繁地发生,几乎总是发生。我认为这些可能会发生,因为消费者试图在不存在的位置读取缓冲区,但我发现情况并非如此。
可能是什么导致了我的问题?有没有可能多线程在Mutex上传递WaitForSingleObject?
这是Fibonacci.c
#include "Fibonacci.h"
#define MINIMUM 1
#define MAXIMUM 5
HANDLE eBufferNotFull;
HANDLE eBufferNotEmpty;
HANDLE fiboMutex;
HANDLE bufferMutex;
CircularBuffer *buffer;
Fibonumbers numbers;
int main(void) {
uint8_t amountOfProducers, amountOfConsumers, size;
ThreadStruct consumerInfo, producerInfo;
setValue("The amount of producers", &amountOfProducers, MINIMUM, MAXIMUM);
setValue("The amount of consumers", &amountOfConsumers, MINIMUM, MAXIMUM);
setValue("The size of the buffer", &size, 1, 80);
resetFibo(&numbers);
setValue("The sleeping time for producers", &producerInfo.sleep, 0, 10000);
setValue("The sleeping time for consumers", &consumerInfo.sleep, 0, 10000);
setValue("The limit for the fibonumber", &producerInfo.limit, 0, 35000000000000000);
consumerInfo.limit = producerInfo.limit;
HANDLE hProducer[MAXIMUM];
DWORD dwProducer[MAXIMUM];
HANDLE hConsumer[MAXIMUM];
DWORD dwConsumer[MAXIMUM];
buffer = createBuffer(size);
/* Create the Mutexes */
fiboMutex = CreateMutex(NULL, FALSE, NULL);
bufferMutex = CreateMutex(NULL, FALSE, NULL);
/* Create the Events */
eBufferNotFull = CreateEvent(NULL, FALSE, TRUE, TEXT("buffer_niet_vol"));
eBufferNotEmpty = CreateEvent(NULL, FALSE, FALSE, TEXT("buffer_niet_leeg"));
/* Create the producer threads*/
for (int i = 0; i < amountOfProducers; ++i) {
hProducer[i] = CreateThread(NULL, // No security
0, // Use default stack size
(LPTHREAD_START_ROUTINE)producer,
&producerInfo, // Thread argument
0, // Child became running
(LPDWORD)&dwProducer[i]); // Child id
}
/* Create the consumer threads*/
for (int i = 0; i < amountOfConsumers; ++i) {
hConsumer[i] = CreateThread(NULL, // No security
0, // Use default stack size
(LPTHREAD_START_ROUTINE)consumer,
&consumerInfo, // Thread argument
0, // Child became running
(LPDWORD)&dwConsumer[i]); // Child id
}
WaitForMultipleObjects(amountOfProducers, hProducer, true, INFINITE);
WaitForMultipleObjects(amountOfConsumers, hConsumer, true, INFINITE);
deleteBuffer(buffer);
return (EXIT_SUCCESS);
}
DWORD WINAPI producer(LPVOID lpParameter) {
ThreadStruct *info = (ThreadStruct *)lpParameter;
while (true) {
Sleep(info->sleep);
WaitForSingleObject(fiboMutex, INFINITE); // Lock the fibonumber struct
createNewFibonumber();
if (numbers.currentFibo > info->limit) {
ReleaseMutex(fiboMutex); // Release the fibonumber struct
ExitThread(EXIT_SUCCESS);
}
WaitForSingleObject(eBufferNotFull, INFINITE);
WaitForSingleObject(bufferMutex, INFINITE);
putElement(buffer, numbers.currentFibo);
ReleaseMutex(fiboMutex); // Release the fibonumber struct
ReleaseMutex(bufferMutex);
SetEvent(eBufferNotEmpty);
}
}
DWORD WINAPI consumer(LPVOID lpParameter) {
ThreadStruct *info = (ThreadStruct *)lpParameter;
while (true) {
Sleep(info->sleep);
WaitForSingleObject(eBufferNotEmpty, INFINITE);
WaitForSingleObject(bufferMutex, INFINITE);
printf(" fibogetal: %i \n", getElement(buffer));
ReleaseMutex(bufferMutex);
SetEvent(eBufferNotFull);
}
ExitThread(EXIT_SUCCESS);
}
void createNewFibonumber() {
uint64_t i = numbers.currentFibo;
numbers.currentFibo += numbers.lastFibo;
numbers.lastFibo = i;
}
void resetFibo(Fibonumbers *numbers) {
numbers->lastFibo = 0;
numbers->currentFibo = 1;
}
void setValue(char *text, void *intpointer, uint64_t minimum, uint64_t maximum) {
printf("%s\n", text);
do {
*(uint64_t *)intpointer = 0;
printf("Enter a value from %lli up to %lli : ", minimum, maximum);
scanf_s("%lli", intpointer);
} while (*(uint64_t *)intpointer < minimum || *(uint64_t *)intpointer > maximum);
}
Fibonacci.h
#include <stdio.h>
#include <stdint.h>
#include <conio.h>
#include <Windows.h>
#include "Buffer.h"
typedef struct {
uint64_t currentFibo;
uint64_t lastFibo;
} Fibonumbers;
typedef struct {
uint64_t limit;
uint16_t sleep;
} ThreadStruct;
/*
*
*/
DWORD WINAPI producer(LPVOID lpParameter);
/*
*
*/
DWORD WINAPI consumer(LPVOID lpParameter);
/*
*
*/
void createNewFibonumber();
/*
*
*/
void resetFibo(Fibonumbers *numbers);
/*
*
*/
void setValue(char *text, void *intpointer, uint64_t minimum, uint64_t maximum);
还有 Buffer.c
#include "Buffer.h"
CircularBuffer *createBuffer(uint8_t size) {
CircularBuffer *buffer = (CircularBuffer *)calloc(1, sizeof(CircularBuffer));
buffer->size = size;
buffer->count = 0;
buffer->start = 0;
buffer->end = 0;
buffer->buffer = (uint64_t *)calloc(buffer->size, sizeof(uint64_t));
return buffer;
}
void deleteBuffer(CircularBuffer *buffer) {
if (buffer) {
free(buffer->buffer);
free(buffer);
}
}
void putElement(CircularBuffer *buffer, uint64_t element) {
buffer->count++;
buffer->buffer[buffer->start] = element;
buffer->start++;
if (buffer->start == buffer->size) {
buffer->start = 0;
}
printf("put: %i items in buffer.\n", buffer->count);
}
uint64_t getElement(CircularBuffer *buffer) {
buffer->count--;
uint64_t value = buffer->buffer[buffer->end];
buffer->end++;
if (buffer->end == buffer->size) {
buffer->end = 0;
}
printf(" get: %i items in buffer.\n", buffer->count);
return value;
}
bool isBufferFull(CircularBuffer *buffer) {
return (buffer->count == buffer->size);
}
bool isBufferEmpty(CircularBuffer *buffer) {
return (buffer->count == 0);
}
Buffer.h
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct {
uint64_t *buffer;
uint8_t size;
uint8_t count;
uint8_t start;
uint8_t end;
} CircularBuffer;
CircularBuffer *createBuffer(uint8_t size);
void deleteBuffer(CircularBuffer *buffer);
void putElement(CircularBuffer *buffer, uint64_t element);
uint64_t getElement(CircularBuffer *buffer);
bool isBufferFull(CircularBuffer *buffer);
bool isBufferEmpty(CircularBuffer *buffer);
如果有人还想查看头文件,请说明。
编辑:我更新了代码,现在可以正常使用了。 edit2: 当我在调试模式下构建程序时,程序可以运行,但在发布模式下构建时,它似乎没有启动线程。
分配大小错误
我发现这一行非常可疑:
buffer->buffer = (uint64_t *)calloc(buffer->size, sizeof(unsigned int));
如果buffer->buffer
是uint64_t
的数组,为什么要用sizeof(unsigned int)
分配呢?我不知道这是否是您的问题,但这至少是应该解决的一件事。