线程池执行在 C 中的任意点停止

Threadpool execution stops at an arbitrary point in C

我正在用 C 实现我自己的通用 Threadpool 算法,使用斐波那契数列进行测试,在过去的几天里,我一直被一个完全困扰我的问题所困扰。

当执行程序时,它会一直工作,直到某个时候突然无故停止,这对我来说很明显。

我注意到的一件事是执行会在一小段时间后停止,因为如果向其中添加打印命令或睡眠命令,它会在执行中提前停止。

编辑:错过了这部分,我已经测试了死锁并且有 none,它似乎只是在某个时候没有将任何新东西压入堆栈,导致所有线程都试图从中拉出堆栈,识别它是空的并跳回只是为了无限地重复该过程。

代码如下:

threadpool.h

#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED

#include <stddef.h>
#include <stdbool.h>

typedef void (*ThreadTask_f)(void*);

typedef struct Future {
    ThreadTask_f fn;   //Pointer to the to be executed function
    bool fulfilled;
} Future;


extern int tpInit(size_t size);

extern void tpRelease(void);

extern void tpAsync(Future *future);

extern void tpAwait(Future *future);

/* creates an abstraction for easy interaction of functions with the threadpool
 * TYPE: type that the function returns
 * NAME: name of the function to be parralelised
 * ARG: type of the argument of the function given
*/
#define TASK(TYPE, NAME, ARG) \
    TYPE NAME(ARG); \
    \
    typedef struct { \
        Future fut;  \
        ARG    arg;  \
        TYPE   res;  \
    } NAME ## _fut;  \
    \
    static void NAME ## Thunk(void *args) { \
        NAME ## _fut *data = args;          \
        data->res = NAME(data->arg);        \
    } \
    static inline NAME ## _fut NAME ## Future(ARG arg) { \
        return (NAME ## _fut) {                          \
            .fut = { .fn = &NAME ## Thunk, .fulfilled = false },             \
            .arg = arg                                   \
        };                                               \
    } \
    static inline NAME ## _fut* NAME ## Async(NAME ## _fut *future) { \
        tpAsync(&future->fut);                 \
        return future;                         \
    } \
    static inline TYPE NAME ## Await(NAME ## _fut *future) { \
        tpAwait(&future->fut);        \
        return future->res;           \
    }

#endif

threadpool.c


#include "threadpool.h"

#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <time.h>

#define THREADSTACKSIZE 8388608
#define INITSTACKSIZE 1024  //initial value for how many Tasks can be in the taskstack
#define STACKMEMMULT 2  //if the TaskStack is full, multiply by this


typedef struct TaskStack {
    Future **start;
    size_t size;
    long current;
} TaskStack;

typedef struct ThreadPool {
    size_t size;
    pthread_t *threads;
    TaskStack *stack;
} ThreadPool;

static pthread_mutex_t stackAccess;

static ThreadPool *tp;

void nsleep(unsigned long nano) {
    struct timespec delay = {
        .tv_sec = 0,
        .tv_nsec = nano
    };
    nanosleep(&delay, NULL);
}

static void push(Future *future){
    pthread_mutex_lock(&stackAccess);
        if(tp->stack->current++==tp->stack->size){
            tp->stack->size*=2;
            tp->stack->start=realloc(tp->stack->start, tp->stack->size);
        }
        tp->stack->start[tp->stack->current]=future;
    pthread_mutex_unlock(&stackAccess);
}

static Future *pull(){
    Future *retVal=NULL;
    PULLBEGIN:
    pthread_mutex_lock(&stackAccess);
        if(tp->stack->current==-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
            pthread_mutex_unlock(&stackAccess);
            pthread_testcancel();
            sched_yield();
            goto PULLBEGIN;
        }
        retVal=tp->stack->start[tp->stack->current];
        tp->stack->current--;
    pthread_mutex_unlock(&stackAccess);
    return retVal;
}

static void *workerThread(void *args){
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    Future *fut;
    while(true){
        fut=pull();
        fut->fn(fut);
        fut->fulfilled=true;
        pthread_testcancel();
    }

    return NULL;
}

int tpInit(size_t size) {
    
    int err;
    tp=NULL;
    accessStack=0;
    pushExisting=0;
    pthread_mutex_init(&stackAccess, NULL);
    tp=malloc(sizeof(ThreadPool));
    if(tp==NULL){
        err=0;
        goto ERRHANDLINIT;
    }
    tp->size=0;
    tp->stack=malloc(sizeof(TaskStack));
    if(tp->stack==NULL){
        err=1;
        goto ERRHANDLINIT;
    }
    tp->threads=malloc(sizeof(pthread_t)*size);
    if(tp->threads==NULL){
        err=2;
        goto ERRHANDLINIT;
    }
    tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
    if(tp->stack->start==NULL){
        err=3;
        goto ERRHANDLINIT;
    }
    tp->stack->current=-1;
    tp->stack->size=INITSTACKSIZE; 
    pthread_attr_t attributes;
    if(pthread_attr_init(&attributes)!=0){
        err=4;
        goto ERRHANDLINIT;
    }
    if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
        err=5;
        goto ERRHANDLINIT;
    }

    if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
        err=6;
        goto ERRHANDLINIT;
    }

    for(int i=0; i<size;i++){
        if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
            err=20+i;
            goto ERRHANDLINIT;
        }
    }
    return 0;

    ERRHANDLINIT:
    perror("Problem while initiating the threadpool with the following errcode: ");
    fprintf(stderr,"%i\n", err);
    return -1;
}

void tpRelease(void) {
    for(int i=0; i<tp->size; i++){
        pthread_cancel(tp->threads[i]);
        pthread_join(tp->threads[i], NULL);
    }
    free(tp->stack->start);
    free(tp->stack);
    free(tp->threads);
    free(tp);
}

void tpAsync(Future *future) {
    future->fulfilled=false;
    push(future);
    return;
}

void tpAwait(Future *future) {
    while(!future->fulfilled){
        Future *workFut=pull();
        workFut->fn(workFut);
        workFut->fulfilled=true;
    }
}

main.c

#include "threadpool.h"

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


static TASK(long, fib, long);

long fib(long n) {
    if (n <= 1){
        return n;
    }

    fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) });
    fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) });

    return fibAwait(a) + fibAwait(b);
}

int main() {
    if (tpInit(8) != 0)
        perror("Thread Pool initialization failed"), exit(-1);
    atexit(&tpRelease);
    
    for (long i = 0; i <= 100; ++i)
        printf("fib(%2li) = %li\n", i, fib(i));
    
    return 0;
}

生成文件

#!/usr/bin/make
.SUFFIXES:
.PHONY: all run pack clean

SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
TAR = threadpool

CFLAGS = -std=gnu11 -c -g -Os -Wall -MMD -MP
LFLAGS = -pthread

DEP = $(OBJ:%.o=%.d)
-include $(DEP)

%.o: %.c
    $(CC) $(CFLAGS) $< -o $@

$(TAR): $(filter-out quicksort.o,$(OBJ))
    $(CC) $(LFLAGS) -o $@ $^

all: $(TAR)

run: all
    ./$(TAR)

clean:
    $(RM) $(RMFILES) $(OBJ) $(TAR) bench $(DEP) $(PCK)

我真的希望你知道发生了什么。提前谢谢你。

在 Craig Estey 和 Amit 的慷慨帮助下(您可以在原文 post 下的评论中看到),我弄明白了。

所以最后这是一个僵局,因为,正如您仍然可以在原始 post 中看到的那样,我不会修改它,所以任何有兴趣的人都有机会看到我的愚蠢。

发生这种情况是因为某一时刻将有 6 个线程等待拉取,堆栈为空,剩下的两个线程一个进入等待状态,另一个刚刚完成它的给定功能,这是一个没有递归调用另一个(在我们的示例中使用 fib(0) 或 fib(1))。现在线程已经完成,我们称它为线程 7,进入 fib_await() 将检查它正在等待的值是否已完成,此时尚未完成,因此它会检查是否堆栈中还有其他的。因为有none,卡在wait中

现在另一个线程,线程 8,刚刚完成它的给定功能的线程将其未来标记为已完成,并试图拉动另一个未来。因为它是空的,所以它也会留在拉里。

现在所有线程都卡在 pull 中,none 可以继续进行,因为正在等待另一个的线程必须首先离开 pull()。

我唯一的修改是 pull()、push()、tpAwait()、tpInit() 和 workerThread(),因为我还实现了一个非常简单的票证锁。

threadpool.c


static void ticketLockInit(){
    atomic_init(&nowServing, 0);
    atomic_init(&nextTicket, 0);
}

static inline void ticketLockAcquire(){

    atomic_long myTicket=atomic_fetch_add(&nextTicket,1);

    while(myTicket!=nowServing){
        nsleep(1);
    }

}

static inline void ticketLockRelease(){
    ++nowServing;
}

static void push(Future *future){
    
    ticketLockAcquire();
        if(++tp->stack->current==tp->stack->size){
            fprintf(stderr, "MemRealloc\n");
            tp->stack->size=tp->stack->size*2;
            tp->stack->start=realloc(tp->stack->start, tp->stack->size);
        }
        tp->stack->start[tp->stack->current]=future;
    ticketLockRelease();

}

static Future *pull(){
    Future *retVal=NULL;

    ticketLockAcquire();
        if(tp->stack->current>-1){  //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
            retVal=tp->stack->start[tp->stack->current];
        tp->stack->current--;
        }
    ticketLockRelease();

    return retVal;
}

static void *workerThread(void *args){
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    Future *fut;
    while(true){
        if((fut=pull())!=NULL){
            fut->fn(fut);
            fut->fulfilled=true;
            pthread_testcancel();
        }
    }
    return NULL;
}

void tpAwait(Future *future) {
    while(!future->fulfilled){
        
        Future *workFut;
        if((workFut=pull())!=NULL){
            workFut->fn(workFut);
            workFut->fulfilled=true;
            pthread_testcancel();
        }
        
        
    }
}

int tpInit(size_t size) {
    
    int err;
    tp=NULL;
    accessStack=0;
    pushExisting=0;
    pthread_mutex_init(&stackAccess, NULL);
    ticketLockInit();
    tp=malloc(sizeof(ThreadPool));
    if(tp==NULL){
        err=0;
        goto ERRHANDLINIT;
    }
    tp->size=0;
    tp->stack=malloc(sizeof(TaskStack));
    if(tp->stack==NULL){
        err=1;
        goto ERRHANDLINIT;
    }
    tp->threads=malloc(sizeof(pthread_t)*size);
    if(tp->threads==NULL){
        err=2;
        goto ERRHANDLINIT;
    }
    tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
    if(tp->stack->start==NULL){
        err=3;
        goto ERRHANDLINIT;
    }
    tp->stack->current=-1;
    tp->stack->size=INITSTACKSIZE; 
    pthread_attr_t attributes;
    if(pthread_attr_init(&attributes)!=0){
        err=4;
        goto ERRHANDLINIT;
    }
    if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
        err=5;
        goto ERRHANDLINIT;
    }

    if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
        err=6;
        goto ERRHANDLINIT;
    }

    for(int i=0; i<size;i++){
        if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
            err=20+i;
            goto ERRHANDLINIT;
        }
    }
    return 0;

    ERRHANDLINIT:
    perror("Problem while initiating the threadpool with the following errcode: ");
    fprintf(stderr,"%i\n", err);
    return -1;
}