在使用 Vivado HLS 时如何用适当的功能替换互斥量?

How can I replace mutex with proper fucntion in using Vivado HLS?

提前抱歉,因为我是 Vivado HLS 的初学者。 在我下面的代码中,我想合成它,但 Vivado 告诉我你不能使用互斥锁和任何依赖项,并给我以下错误。

ERROR: [SYNCHK 200-11] Global Variable 'readyQMutex' has an unsynthesizable struct type '%union.pthread_mutex_t.2.12.22 = type { %struct.__pthread_mu...' (a member pointer to struct itself).
ERROR: [SYNCHK 200-71] ../fpga_top.c:221: function 'pthread_mutex_lock' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:225: function 'pthread_cond_wait' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:237: function 'pthread_cond_signal' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:238: function 'pthread_mutex_unlock' has no function body.
ERROR: [SYNCHK 200-11] ../fpga_top.c:18: Constant 'workerInfos' has an unsynthesizable type '[4 x %struct.threadInfo.6.16.26]*' (possible cause(s): structure variable cannot be decomposed due to (1) unsupported type conversion; (2) memory copy operation; (3) function pointer used in struct; (4) unsupported pointer comparison).
ERROR: [SYNCHK 200-61] ../fpga_top.c:75: unsupported memory access on variable 'child_task_ID' which is (or contains) an array with unknown size at compile time.
ERROR: [SYNCHK 200-71] ../fpga_top.c:77: function 'pthread_mutex_init' has no function body.
INFO: [SYNCHK 200-10] 8 error(s), 0 warning(s).

我发现我应该自己写相关的代码来处理,如果是这样,我应该怎么写,写什么?!

#include <stdbool.h>
#include "fpga_top.h"

int outputIndex = 0;
double core_speed[CORE_MAX] = {1.0, 1.0, 1.0, 1.0};
double outputTable[WORKLOAD_MAX*TASK_COUNT_MAX][EXCEL_Column_Size];

int readyQueueHead = 0;
int readyQueueRear = 0;
int readyQueueSize = 0;
char canContinue_ = 1;
int wlCounter = 0;      
bool flag = 1;

// Add Task to assignedQueue
void addToAssignedQueue(int task_ID, int workload_ID, int q)
{
    pthread_mutex_lock(&(workerInfos[q].workerMutex));
    while( workerInfos[q].assignedQSize>=DEEP)
    {
        pthread_cond_wait(&(workerInfos[q].workerWaitHandle_Add), &(workerInfos[q].workerMutex));
    }
    int i = workerInfos[q].assignedQRear;
    workerInfos[q].assignedQueue[i].task_ID = task_ID;
    workerInfos[q].assignedQueue[i].workload_ID = workload_ID;
    workerInfos[q].assignedQRear = (workerInfos[q].assignedQRear + 1) % DEEP;
    workerInfos[q].assignedQSize++;
    // A signal to a worker waiting to read from this queue
    pthread_cond_signal(&(workerInfos[q].workerWaitHandle));
    pthread_mutex_unlock(&(workerInfos[q].workerMutex));
}

// Read from assignedQueue
struct workItem readFromAssignedQueue(int q)
{
    struct threadInfo *workerInfo_ = &workerInfos[q];
    pthread_mutex_lock(&(workerInfo_->workerMutex));

    struct workItem tas_;
    // Initialize the output values (which may not be necessary now)
    tas_.task_ID = -1;
    tas_.workload_ID = -1;
    if(workerInfo_->assignedQSize <= 0)
    {
        struct timespec time_to_wait = {10, 0}; //10 sec wait
        pthread_cond_timedwait(&(workerInfo_->workerWaitHandle), &(workerInfo_->workerMutex), &time_to_wait);
    }
    if(workerInfo_->assignedQSize >0)
    {
        // Reading the assignedQueue if data is available
        tas_ = workerInfo_->assignedQueue[workerInfo_->assignedQHead];
        // Move forward the queue head index rotationally
        workerInfos[q].assignedQHead = (workerInfos[q].assignedQHead + 1) % DEEP;
        // Decreasing the count number of queue elements
        workerInfos[q].assignedQSize--;
        pthread_cond_signal(&(workerInfos[q].workerWaitHandle_Add));
    }
    pthread_mutex_unlock(&(workerInfo_->workerMutex));
    return tas_;
}

// Add Definition of Task to DAG
void addTask(int task_ID, int parentCount, int child_task_ID[], int childCount, int processingTime)
{
    struct Task_Package_Profile *p_task_ = &(taskArray[task_ID]);
    p_task_->parentCount = parentCount;
    p_task_->childCount = childCount;
    p_task_->processingTime = processingTime;
    // Initialize the parentReady variable for all workloads
    for (int i = 0; i < WORKLOAD_MAX;i++) {p_task_->parentReady[i] = 0;}
    // Copy the child's index
    for (int i = 0; i < childCount; i++) {p_task_->child_task_ID[i] = child_task_ID[i];}
    // Make parentReady mutex
    pthread_mutex_init(&(p_task_->parentReadyMutex), NULL);
}

// DAG Definition
void initDag()
{
    int ch0[] = { 1, 2, 3, 4}; addTask( 0, 0, ch0, 4, 10000);

    int ch1[] = { 5, 6, 7, 8}; addTask( 1, 1, ch1, 4, 20000);
    int ch2[] = { 5, 6, 7, 8}; addTask( 2, 1, ch2, 4, 20000);
    int ch3[] = { 5, 6, 7, 8}; addTask( 3, 1, ch3, 4, 20000);
    int ch4[] = { 5, 6, 7, 8}; addTask( 4, 1, ch4, 4, 20000);

    int ch5[] = { 9, 10}; addTask( 5, 4, ch5, 2, 30000);
    int ch6[] = { 9, 10}; addTask( 6, 4, ch6, 2, 30000);
    int ch7[] = { 9, 10}; addTask( 7, 4, ch7, 2, 30000);
    int ch8[] = { 9, 10}; addTask( 8, 4, ch8, 2, 30000);

    int ch9[] = { 11, 12}; addTask( 9, 4, ch9, 2, 40000);
    int ch10[] = { 11, 12}; addTask( 10, 4, ch10, 2, 40000);

    int ch11[] = {}; addTask( 11, 2, ch11, 0, 50000);
    int ch12[] = {}; addTask( 12, 2, ch12, 0, 50000);

    addToReadyQueue(0, 0);              // Root task, addToReadyQueue(int task_ID, int workload_ID)
    readFromReadyQueue();
    //allocateTask(0, 0, 0);                    // allocateTask(int task_ID, int workload_ID, int core_ID)
}

// Add Task to the end of the readyQueue 
void addToReadyQueue(int task_ID, int workload_ID)
{
    pthread_mutex_lock(&readyQMutex);
    while(readyQueueSize >= READY_LOOP_DEEP)
    {
        // Waiting for the queue to be empty if there is no space
        int res = pthread_cond_wait( &readyQWaitHandleAdd, &readyQMutex);
    }
    #ifdef PRINT_ReadyQ
        printf("Task #%d (workload #%d) added to readyQueue %d:%d.\n", task_ID, workload_ID,readyQueueRear, readyQueueSize);
    #endif
    readyQueue[readyQueueRear].task_ID = task_ID;
    readyQueue[readyQueueRear].workload_ID = workload_ID;
    // Move forward the queue rear index in rotation
    readyQueueRear = (readyQueueRear + 1) % READY_LOOP_DEEP;
    // Increasing the number of the queue elements
    readyQueueSize++;
    // The signal is given to workers waiting to read from the queue
    pthread_cond_signal(&readyQWaitHandleRead);
    pthread_mutex_unlock(&readyQMutex);
}

// Read from the beginning of the readyQueue
struct workItem readFromReadyQueue()
{
    struct workItem witem;
    witem.task_ID = -1;
    witem.workload_ID = -1;

    pthread_mutex_lock(&readyQMutex);
    // Waiting to queue if empty
    while(readyQueueSize <= 0)
    {
        pthread_cond_wait( &readyQWaitHandleRead, &readyQMutex);
    }
    // Picking up from queue head
    witem = readyQueue[readyQueueHead];
    // Move forward the queue head index in rotation
    readyQueueHead = (readyQueueHead + 1) % READY_LOOP_DEEP;
    // Reduce the number of queue elements
    readyQueueSize--;
    #ifdef PRINT_ReadyQ
        printf("Task #%d (workload #%d) removed to readyQueue. %d : %d\n", witem.task_ID , witem.workload_ID, readyQueueHead, readyQueueSize);
    #endif
    // The signal is given to workers who are waiting for the queue to be empty
    pthread_cond_signal(&readyQWaitHandleAdd);
    pthread_mutex_unlock(&readyQMutex);
    return witem;
}

// Check if the reaadyQueue is empty with the corresponding mutex
int isReadyQueueEmpty()
{
    int res = 0;
    pthread_mutex_lock(&readyQMutex);
    res = (readyQueueSize == 0);
    pthread_mutex_unlock(&readyQMutex);
    return res;
}

// Assigning Task to the Worker (Cores)
struct outputsFromFPGA allocateTask(int task_ID, int workload_ID, int core_ID)
{
    if (flag == 1)
    {
        initDag();
        flag = 0;
    }
    #ifdef PRINT_AllocateTask
        printf("Task #%d (workload #%d) assigned to Core #%d;\n", task_ID, workload_ID, core_ID);
    #endif
    addToAssignedQueue( task_ID, workload_ID, core_ID);

    struct outputsFromFPGA FPGAOutputs;
    FPGAOutputs.task_ID = task_ID;
    FPGAOutputs.workload_ID = workload_ID;
    FPGAOutputs.core_ID = core_ID;
}

// Ending each task and inform the children
void taskDone(int task_ID, int workload_ID, int core_ID)
{
    struct Task_Package_Profile task_ = taskArray[task_ID];
    #ifdef PRINT_TaskDone
        printf("taskDone: Task #%d (workload #%d);\n", task_ID, workload_ID);
    #endif
    // Increase the child's parentReady variable and send the children to the ready queue if all parents are finished
    struct Task_Package_Profile *p_task_ = &(taskArray[task_ID]);
    for(int i = 0; i < p_task_->childCount; i++)
    {
        struct Task_Package_Profile *p_childTsk = &(taskArray[p_task_->child_task_ID[i]]);
        int nbParentReady = 0;
        // Increase the parentReady variable
        pthread_mutex_lock(&(p_childTsk->parentReadyMutex));
        nbParentReady = ++(p_childTsk->parentReady[workload_ID]);
        pthread_mutex_unlock(&(p_childTsk->parentReadyMutex));
        // Send the child to the ready queue if all parents are finished
        if (nbParentReady == p_childTsk->parentCount)
            addToReadyQueue(p_task_->child_task_ID[i], workload_ID);
    }
    pthread_mutex_lock(&assignQSizeCheckMutex);
    // Find the most empty assignedQueue and assign ready tasks as much as possible
    while(!isReadyQueueEmpty())
    {   // Finds the best assignedQueue
        int minQueue = 0;
        int minSize =  workerInfos[0].assignedQSize;
        for (int i = 1; i < CORE_MAX; i++)
        {
            if(workerInfos[i].assignedQSize < minSize)
            {
                minSize = workerInfos[i].assignedQSize;
                minQueue = i;
            }
        }
        // The most empty queue should be smaller than Deep so that it can be added to the queue
        if(minSize < DEEP)
        {
            struct workItem witem = readFromReadyQueue();
            struct outputsFromFPGA FPGAOutputs = allocateTask(witem.task_ID, witem.workload_ID, minQueue);
        }
        else
        {
            break;  // All assignedQueue are full
        }
    }
    pthread_mutex_unlock(&assignQSizeCheckMutex);
}

// Check the end of the program that has all the tests done
void finishCheck()
{
    if (wlCounter != WORKLOAD_MAX) return;
    for(int i = 0; i < CORE_MAX; i++)
    {
        if (workerInfos[i].assignedQSize > 0) return;
        if (workerInfos[i].coreState > 0) return;
    }
    if (!isReadyQueueEmpty()) return;
    canContinue_ = 0;
    for(int i = 0; i < CORE_MAX; i++)
        pthread_cond_signal(&(workerInfos[i].workerWaitHandle));
}

线程同步可以在HLS中完成,例如this paper,但在Vivado HLS中尚不支持。

话虽这么说,但这并不意味着无法在硬件上实现您的应用程序。一种方法是将每个线程实现为一个单独的硬件内核。共享数据可以放在另一个内核中,以确保对数据的访问以您想要的方式同步。内核可以通过流接口与共享对象通信。您可以使用 hls::stream 将函数参数实现为流接口。将每个内核实现为 IP 模块后,您可以通过 Vivado 模块设计中的 FIFO 生成器生成的 FIFO 连接它们。

例如,您可以创建一个从每个处理内核到共享对象的控制流,以允许内核发送访问共享对象的请求。在共享对象中,您使用流中的非阻塞读取来查看是否有任何流想要独占访问。然后,您仅从被授予独占访问权限的内核的控制流中获取写入或读取请求。与读取和写入相关的数据可以通过内核和共享对象之间的专用数据流进行通信。当内核使用完共享对象后,它可以发送释放命令,然后共享对象开始再次寻找所有控制流上的请求。虽然费点功夫,但不失为一个可行的方案...