线程池执行在 C 中的任意点停止
Threadpool execution stops at an arbitrary point in C
我正在用 C 实现我自己的通用 Threadpool 算法,使用斐波那契数列进行测试,在过去的几天里,我一直被一个完全困扰我的问题所困扰。
当执行程序时,它会一直工作,直到某个时候突然无故停止,这对我来说很明显。
我注意到的一件事是执行会在一小段时间后停止,因为如果向其中添加打印命令或睡眠命令,它会在执行中提前停止。
编辑:错过了这部分,我已经测试了死锁并且有 none,它似乎只是在某个时候没有将任何新东西压入堆栈,导致所有线程都试图从中拉出堆栈,识别它是空的并跳回只是为了无限地重复该过程。
代码如下:
threadpool.h
#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED
#include <stddef.h>
#include <stdbool.h>
typedef void (*ThreadTask_f)(void*);
typedef struct Future {
ThreadTask_f fn; //Pointer to the to be executed function
bool fulfilled;
} Future;
extern int tpInit(size_t size);
extern void tpRelease(void);
extern void tpAsync(Future *future);
extern void tpAwait(Future *future);
/* creates an abstraction for easy interaction of functions with the threadpool
* TYPE: type that the function returns
* NAME: name of the function to be parralelised
* ARG: type of the argument of the function given
*/
#define TASK(TYPE, NAME, ARG) \
TYPE NAME(ARG); \
\
typedef struct { \
Future fut; \
ARG arg; \
TYPE res; \
} NAME ## _fut; \
\
static void NAME ## Thunk(void *args) { \
NAME ## _fut *data = args; \
data->res = NAME(data->arg); \
} \
static inline NAME ## _fut NAME ## Future(ARG arg) { \
return (NAME ## _fut) { \
.fut = { .fn = &NAME ## Thunk, .fulfilled = false }, \
.arg = arg \
}; \
} \
static inline NAME ## _fut* NAME ## Async(NAME ## _fut *future) { \
tpAsync(&future->fut); \
return future; \
} \
static inline TYPE NAME ## Await(NAME ## _fut *future) { \
tpAwait(&future->fut); \
return future->res; \
}
#endif
threadpool.c
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <time.h>
#define THREADSTACKSIZE 8388608
#define INITSTACKSIZE 1024 //initial value for how many Tasks can be in the taskstack
#define STACKMEMMULT 2 //if the TaskStack is full, multiply by this
typedef struct TaskStack {
Future **start;
size_t size;
long current;
} TaskStack;
typedef struct ThreadPool {
size_t size;
pthread_t *threads;
TaskStack *stack;
} ThreadPool;
static pthread_mutex_t stackAccess;
static ThreadPool *tp;
void nsleep(unsigned long nano) {
struct timespec delay = {
.tv_sec = 0,
.tv_nsec = nano
};
nanosleep(&delay, NULL);
}
static void push(Future *future){
pthread_mutex_lock(&stackAccess);
if(tp->stack->current++==tp->stack->size){
tp->stack->size*=2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
pthread_mutex_unlock(&stackAccess);
}
static Future *pull(){
Future *retVal=NULL;
PULLBEGIN:
pthread_mutex_lock(&stackAccess);
if(tp->stack->current==-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
pthread_mutex_unlock(&stackAccess);
pthread_testcancel();
sched_yield();
goto PULLBEGIN;
}
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
pthread_mutex_unlock(&stackAccess);
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
fut=pull();
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
return NULL;
}
int tpInit(size_t size) {
int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE;
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i++){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20+i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%i\n", err);
return -1;
}
void tpRelease(void) {
for(int i=0; i<tp->size; i++){
pthread_cancel(tp->threads[i]);
pthread_join(tp->threads[i], NULL);
}
free(tp->stack->start);
free(tp->stack);
free(tp->threads);
free(tp);
}
void tpAsync(Future *future) {
future->fulfilled=false;
push(future);
return;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut=pull();
workFut->fn(workFut);
workFut->fulfilled=true;
}
}
main.c
#include "threadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
static TASK(long, fib, long);
long fib(long n) {
if (n <= 1){
return n;
}
fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) });
fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) });
return fibAwait(a) + fibAwait(b);
}
int main() {
if (tpInit(8) != 0)
perror("Thread Pool initialization failed"), exit(-1);
atexit(&tpRelease);
for (long i = 0; i <= 100; ++i)
printf("fib(%2li) = %li\n", i, fib(i));
return 0;
}
生成文件
#!/usr/bin/make
.SUFFIXES:
.PHONY: all run pack clean
SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
TAR = threadpool
CFLAGS = -std=gnu11 -c -g -Os -Wall -MMD -MP
LFLAGS = -pthread
DEP = $(OBJ:%.o=%.d)
-include $(DEP)
%.o: %.c
$(CC) $(CFLAGS) $< -o $@
$(TAR): $(filter-out quicksort.o,$(OBJ))
$(CC) $(LFLAGS) -o $@ $^
all: $(TAR)
run: all
./$(TAR)
clean:
$(RM) $(RMFILES) $(OBJ) $(TAR) bench $(DEP) $(PCK)
我真的希望你知道发生了什么。提前谢谢你。
在 Craig Estey 和 Amit 的慷慨帮助下(您可以在原文 post 下的评论中看到),我弄明白了。
所以最后这是一个僵局,因为,正如您仍然可以在原始 post 中看到的那样,我不会修改它,所以任何有兴趣的人都有机会看到我的愚蠢。
发生这种情况是因为某一时刻将有 6 个线程等待拉取,堆栈为空,剩下的两个线程一个进入等待状态,另一个刚刚完成它的给定功能,这是一个没有递归调用另一个(在我们的示例中使用 fib(0) 或 fib(1))。现在线程已经完成,我们称它为线程 7,进入 fib_await() 将检查它正在等待的值是否已完成,此时尚未完成,因此它会检查是否堆栈中还有其他的。因为有none,卡在wait中
现在另一个线程,线程 8,刚刚完成它的给定功能的线程将其未来标记为已完成,并试图拉动另一个未来。因为它是空的,所以它也会留在拉里。
现在所有线程都卡在 pull 中,none 可以继续进行,因为正在等待另一个的线程必须首先离开 pull()。
我唯一的修改是 pull()、push()、tpAwait()、tpInit() 和 workerThread(),因为我还实现了一个非常简单的票证锁。
threadpool.c
static void ticketLockInit(){
atomic_init(&nowServing, 0);
atomic_init(&nextTicket, 0);
}
static inline void ticketLockAcquire(){
atomic_long myTicket=atomic_fetch_add(&nextTicket,1);
while(myTicket!=nowServing){
nsleep(1);
}
}
static inline void ticketLockRelease(){
++nowServing;
}
static void push(Future *future){
ticketLockAcquire();
if(++tp->stack->current==tp->stack->size){
fprintf(stderr, "MemRealloc\n");
tp->stack->size=tp->stack->size*2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
ticketLockRelease();
}
static Future *pull(){
Future *retVal=NULL;
ticketLockAcquire();
if(tp->stack->current>-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
}
ticketLockRelease();
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
if((fut=pull())!=NULL){
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
}
return NULL;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut;
if((workFut=pull())!=NULL){
workFut->fn(workFut);
workFut->fulfilled=true;
pthread_testcancel();
}
}
}
int tpInit(size_t size) {
int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
ticketLockInit();
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE;
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i++){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20+i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%i\n", err);
return -1;
}
我正在用 C 实现我自己的通用 Threadpool 算法,使用斐波那契数列进行测试,在过去的几天里,我一直被一个完全困扰我的问题所困扰。
当执行程序时,它会一直工作,直到某个时候突然无故停止,这对我来说很明显。
我注意到的一件事是执行会在一小段时间后停止,因为如果向其中添加打印命令或睡眠命令,它会在执行中提前停止。
编辑:错过了这部分,我已经测试了死锁并且有 none,它似乎只是在某个时候没有将任何新东西压入堆栈,导致所有线程都试图从中拉出堆栈,识别它是空的并跳回只是为了无限地重复该过程。
代码如下:
threadpool.h
#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED
#include <stddef.h>
#include <stdbool.h>
typedef void (*ThreadTask_f)(void*);
typedef struct Future {
ThreadTask_f fn; //Pointer to the to be executed function
bool fulfilled;
} Future;
extern int tpInit(size_t size);
extern void tpRelease(void);
extern void tpAsync(Future *future);
extern void tpAwait(Future *future);
/* creates an abstraction for easy interaction of functions with the threadpool
* TYPE: type that the function returns
* NAME: name of the function to be parralelised
* ARG: type of the argument of the function given
*/
#define TASK(TYPE, NAME, ARG) \
TYPE NAME(ARG); \
\
typedef struct { \
Future fut; \
ARG arg; \
TYPE res; \
} NAME ## _fut; \
\
static void NAME ## Thunk(void *args) { \
NAME ## _fut *data = args; \
data->res = NAME(data->arg); \
} \
static inline NAME ## _fut NAME ## Future(ARG arg) { \
return (NAME ## _fut) { \
.fut = { .fn = &NAME ## Thunk, .fulfilled = false }, \
.arg = arg \
}; \
} \
static inline NAME ## _fut* NAME ## Async(NAME ## _fut *future) { \
tpAsync(&future->fut); \
return future; \
} \
static inline TYPE NAME ## Await(NAME ## _fut *future) { \
tpAwait(&future->fut); \
return future->res; \
}
#endif
threadpool.c
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <time.h>
#define THREADSTACKSIZE 8388608
#define INITSTACKSIZE 1024 //initial value for how many Tasks can be in the taskstack
#define STACKMEMMULT 2 //if the TaskStack is full, multiply by this
typedef struct TaskStack {
Future **start;
size_t size;
long current;
} TaskStack;
typedef struct ThreadPool {
size_t size;
pthread_t *threads;
TaskStack *stack;
} ThreadPool;
static pthread_mutex_t stackAccess;
static ThreadPool *tp;
void nsleep(unsigned long nano) {
struct timespec delay = {
.tv_sec = 0,
.tv_nsec = nano
};
nanosleep(&delay, NULL);
}
static void push(Future *future){
pthread_mutex_lock(&stackAccess);
if(tp->stack->current++==tp->stack->size){
tp->stack->size*=2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
pthread_mutex_unlock(&stackAccess);
}
static Future *pull(){
Future *retVal=NULL;
PULLBEGIN:
pthread_mutex_lock(&stackAccess);
if(tp->stack->current==-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
pthread_mutex_unlock(&stackAccess);
pthread_testcancel();
sched_yield();
goto PULLBEGIN;
}
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
pthread_mutex_unlock(&stackAccess);
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
fut=pull();
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
return NULL;
}
int tpInit(size_t size) {
int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE;
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i++){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20+i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%i\n", err);
return -1;
}
void tpRelease(void) {
for(int i=0; i<tp->size; i++){
pthread_cancel(tp->threads[i]);
pthread_join(tp->threads[i], NULL);
}
free(tp->stack->start);
free(tp->stack);
free(tp->threads);
free(tp);
}
void tpAsync(Future *future) {
future->fulfilled=false;
push(future);
return;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut=pull();
workFut->fn(workFut);
workFut->fulfilled=true;
}
}
main.c
#include "threadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
static TASK(long, fib, long);
long fib(long n) {
if (n <= 1){
return n;
}
fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) });
fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) });
return fibAwait(a) + fibAwait(b);
}
int main() {
if (tpInit(8) != 0)
perror("Thread Pool initialization failed"), exit(-1);
atexit(&tpRelease);
for (long i = 0; i <= 100; ++i)
printf("fib(%2li) = %li\n", i, fib(i));
return 0;
}
生成文件
#!/usr/bin/make
.SUFFIXES:
.PHONY: all run pack clean
SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
TAR = threadpool
CFLAGS = -std=gnu11 -c -g -Os -Wall -MMD -MP
LFLAGS = -pthread
DEP = $(OBJ:%.o=%.d)
-include $(DEP)
%.o: %.c
$(CC) $(CFLAGS) $< -o $@
$(TAR): $(filter-out quicksort.o,$(OBJ))
$(CC) $(LFLAGS) -o $@ $^
all: $(TAR)
run: all
./$(TAR)
clean:
$(RM) $(RMFILES) $(OBJ) $(TAR) bench $(DEP) $(PCK)
我真的希望你知道发生了什么。提前谢谢你。
在 Craig Estey 和 Amit 的慷慨帮助下(您可以在原文 post 下的评论中看到),我弄明白了。
所以最后这是一个僵局,因为,正如您仍然可以在原始 post 中看到的那样,我不会修改它,所以任何有兴趣的人都有机会看到我的愚蠢。
发生这种情况是因为某一时刻将有 6 个线程等待拉取,堆栈为空,剩下的两个线程一个进入等待状态,另一个刚刚完成它的给定功能,这是一个没有递归调用另一个(在我们的示例中使用 fib(0) 或 fib(1))。现在线程已经完成,我们称它为线程 7,进入 fib_await() 将检查它正在等待的值是否已完成,此时尚未完成,因此它会检查是否堆栈中还有其他的。因为有none,卡在wait中
现在另一个线程,线程 8,刚刚完成它的给定功能的线程将其未来标记为已完成,并试图拉动另一个未来。因为它是空的,所以它也会留在拉里。
现在所有线程都卡在 pull 中,none 可以继续进行,因为正在等待另一个的线程必须首先离开 pull()。
我唯一的修改是 pull()、push()、tpAwait()、tpInit() 和 workerThread(),因为我还实现了一个非常简单的票证锁。
threadpool.c
static void ticketLockInit(){
atomic_init(&nowServing, 0);
atomic_init(&nextTicket, 0);
}
static inline void ticketLockAcquire(){
atomic_long myTicket=atomic_fetch_add(&nextTicket,1);
while(myTicket!=nowServing){
nsleep(1);
}
}
static inline void ticketLockRelease(){
++nowServing;
}
static void push(Future *future){
ticketLockAcquire();
if(++tp->stack->current==tp->stack->size){
fprintf(stderr, "MemRealloc\n");
tp->stack->size=tp->stack->size*2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
ticketLockRelease();
}
static Future *pull(){
Future *retVal=NULL;
ticketLockAcquire();
if(tp->stack->current>-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
}
ticketLockRelease();
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
if((fut=pull())!=NULL){
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
}
return NULL;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut;
if((workFut=pull())!=NULL){
workFut->fn(workFut);
workFut->fulfilled=true;
pthread_testcancel();
}
}
}
int tpInit(size_t size) {
int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
ticketLockInit();
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE;
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i++){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20+i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%i\n", err);
return -1;
}