这是实现没有互斥量的异步消息队列的安全方法吗?
Is this a safe way to implement an asynchronous message queue without mutexes?
下面是我如何使用 while 循环实现带有简单检查的异步消息队列。如果它工作正常,我认为这是比使用互斥锁更好的方法。 运行 在我的机器上进行了一些测试,它似乎工作正常,但我真的不确定这是否安全,因为我在为多个 threads/processes 编写异步系统方面没有太多经验。我的工作是否安全地防止了竞争条件?还是它会在负载较重或任何其他情况下崩溃?
typedef struct MessageQueueElement {
Message message;
struct MessageQueueElement *next;
} MessageQueueElement;
typedef struct MessageQueue { //singly-linked list as a queue
MessageQueueElement *first;
MessageQueueElement *last;
bool sending;
} MessageQueue;
void createMessageQueue(MessageQueue *this) {
this->first = malloc(sizeof(MessageQueueElement));
this->last = this->first;
this->sending = false;
}
void sendMessage(MessageQueue *this, Message *message) {
while (this->sending);
//do nothing while this function is called from another thread
this->sending = true;
this->last->message = *message;
this->last = this->last->next = malloc(sizeof(MessageQueueElement));
//add a message to the queue
this->sending = false;
}
int waitMessage(MessageQueue *this, int (*readMessage)(unsigned, unsigned, void *)) {
while (this->first == this->last);
//do nothing while the queue is empty
int n = readMessage(this->first->message.type, this->first->message.code, this->first->message.data);
MessageQueueElement *temp = this->first;
this->first = this->first->next;
free(temp);
return n;
}
整个上下文和一些测试代码见下文。
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#define EXIT_MESSAGE 0
#define THREAD_MESSAGE 1
#define EXIT 0
#define CONTINUE 1
int readMessage(size_t type, size_t code, void *data) {
if (type == THREAD_MESSAGE) {
printf("message from thread %d: %s\n", code, (char *)data);
free(data);
} else {
return EXIT;
}
return CONTINUE;
}
MessageQueue mq;
int nThreads;
int counter = 0;
void *worker(void *p) {
double pi = 0.0;
for (int i = 0; i < 10; i += 1) {
for (int j = 0; j < 100000; j += 1) {
double n = i * 100000.0 + j;
pi += (4.0 / (8.0 * n + 1.0) - 2.0 / (8.0 * n + 4.0) - 1.0 / (8.0 * n + 5.0) - 1.0 / (8.0 * n + 6.0)) / pow(16.0, n);
}
char *s = malloc(100);
sprintf(s, "calculating pi... %d percent complete", (i + 1) * 10);
sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)p, .data = s});
}
char *s = malloc(100);
sprintf(s, "pi equals %.8f", pi);
sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)p, .data = s});
counter += 1;
if (counter == nThreads) {
sendMessage(&mq, &(Message){.type = EXIT_MESSAGE});
}
}
int main(int argc, char **argv) {
nThreads = atoi(argv[1]);
createMessageQueue(&mq);
pthread_t threads[nThreads];
for (int i = 0; i < nThreads; i += 1) {
pthread_create(&threads[i], NULL, worker, (void *)i);
}
while (waitMessage(&mq, readMessage));
for (int i = 0; i < nThreads; i += 1) {
pthread_join(threads[i], NULL);
}
return 0;
}
显然不行。对于初学者来说,如果两个线程都在等待队列变空,它们很可能同时发现发送 == false,并且都跳进去,然后事情以糟糕的方式出错。这正是互斥锁的用途。所以这是行不通的。
忙于等待变量也是一种非常糟糕的形式。如果您有一个四核 CPU,那么四个核很可能会花费 100% 的可用资源 CPU 只是在等待一个变量发生变化。不好。
并且由于发送不是易变的,您的编译器将看不到生成代码以将其设置为 true 的丝毫理由,因此这同样是行不通的。
要使其正常工作,您需要执行互斥体执行的所有操作。而且您必须 完全 正确地做所有事情。这在很大程度上取决于您使用的确切处理器。您需要了解缓存一致性、读写操作的顺序、内存屏障等,如果您甚至没有听说过这些,那么您就没有机会做对。
下面是我如何使用 while 循环实现带有简单检查的异步消息队列。如果它工作正常,我认为这是比使用互斥锁更好的方法。 运行 在我的机器上进行了一些测试,它似乎工作正常,但我真的不确定这是否安全,因为我在为多个 threads/processes 编写异步系统方面没有太多经验。我的工作是否安全地防止了竞争条件?还是它会在负载较重或任何其他情况下崩溃?
typedef struct MessageQueueElement {
Message message;
struct MessageQueueElement *next;
} MessageQueueElement;
typedef struct MessageQueue { //singly-linked list as a queue
MessageQueueElement *first;
MessageQueueElement *last;
bool sending;
} MessageQueue;
void createMessageQueue(MessageQueue *this) {
this->first = malloc(sizeof(MessageQueueElement));
this->last = this->first;
this->sending = false;
}
void sendMessage(MessageQueue *this, Message *message) {
while (this->sending);
//do nothing while this function is called from another thread
this->sending = true;
this->last->message = *message;
this->last = this->last->next = malloc(sizeof(MessageQueueElement));
//add a message to the queue
this->sending = false;
}
int waitMessage(MessageQueue *this, int (*readMessage)(unsigned, unsigned, void *)) {
while (this->first == this->last);
//do nothing while the queue is empty
int n = readMessage(this->first->message.type, this->first->message.code, this->first->message.data);
MessageQueueElement *temp = this->first;
this->first = this->first->next;
free(temp);
return n;
}
整个上下文和一些测试代码见下文。
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#define EXIT_MESSAGE 0
#define THREAD_MESSAGE 1
#define EXIT 0
#define CONTINUE 1
int readMessage(size_t type, size_t code, void *data) {
if (type == THREAD_MESSAGE) {
printf("message from thread %d: %s\n", code, (char *)data);
free(data);
} else {
return EXIT;
}
return CONTINUE;
}
MessageQueue mq;
int nThreads;
int counter = 0;
void *worker(void *p) {
double pi = 0.0;
for (int i = 0; i < 10; i += 1) {
for (int j = 0; j < 100000; j += 1) {
double n = i * 100000.0 + j;
pi += (4.0 / (8.0 * n + 1.0) - 2.0 / (8.0 * n + 4.0) - 1.0 / (8.0 * n + 5.0) - 1.0 / (8.0 * n + 6.0)) / pow(16.0, n);
}
char *s = malloc(100);
sprintf(s, "calculating pi... %d percent complete", (i + 1) * 10);
sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)p, .data = s});
}
char *s = malloc(100);
sprintf(s, "pi equals %.8f", pi);
sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)p, .data = s});
counter += 1;
if (counter == nThreads) {
sendMessage(&mq, &(Message){.type = EXIT_MESSAGE});
}
}
int main(int argc, char **argv) {
nThreads = atoi(argv[1]);
createMessageQueue(&mq);
pthread_t threads[nThreads];
for (int i = 0; i < nThreads; i += 1) {
pthread_create(&threads[i], NULL, worker, (void *)i);
}
while (waitMessage(&mq, readMessage));
for (int i = 0; i < nThreads; i += 1) {
pthread_join(threads[i], NULL);
}
return 0;
}
显然不行。对于初学者来说,如果两个线程都在等待队列变空,它们很可能同时发现发送 == false,并且都跳进去,然后事情以糟糕的方式出错。这正是互斥锁的用途。所以这是行不通的。
忙于等待变量也是一种非常糟糕的形式。如果您有一个四核 CPU,那么四个核很可能会花费 100% 的可用资源 CPU 只是在等待一个变量发生变化。不好。
并且由于发送不是易变的,您的编译器将看不到生成代码以将其设置为 true 的丝毫理由,因此这同样是行不通的。
要使其正常工作,您需要执行互斥体执行的所有操作。而且您必须 完全 正确地做所有事情。这在很大程度上取决于您使用的确切处理器。您需要了解缓存一致性、读写操作的顺序、内存屏障等,如果您甚至没有听说过这些,那么您就没有机会做对。