组织多头数据记录

Organize multitrheading datalogging

我正在为嵌入式目标开发一个 C 项目 运行 Linux 发行版(使用 Yocto 构建)。我是 Linux 嵌入式世界的新手,我必须构建一个数据记录系统。

我已经在学习如何使用线程,我正在考虑项目组织。

这是我的想象:

我不知道组织这个项目的最佳方式是什么。我看到了两种方法,第一种是启动程序创建每个线程并在 while 循环中等待事件监视以停止它们。第二种方式是启动程序将其他二进制文件作为线程执行。 在这两种方式中我不知道如何在线程之间共享数据。

可以分享一下你的经验吗?

谢谢

编辑: 首先,非常感谢@Glärbo 的解释。学习多线程机制真的很有帮助。

我测试成功了。

为了以后的读者,我绘制了图表来说明@Glärbo 的回答。

main thread

productor-sensor thread

datalogger thread

我会做得更简单,使用简单的多生产者、单一消费者方法。

假设每个数据项都可以用一个数值来描述:

struct value {
    struct value   *next;  /* Forming a singly-linked list of data items */
    struct sensor  *from;  /* Identifies which sensor value this is */
    struct timespec when;  /* Time of sensor reading in UTC */
    double          value; /* Numerical value */
};

我会使用两个值列表:一个用于接收但未存储的传感器读数,一个用于未使用的值桶。这样你就不需要动态分配或释放价值桶,除非你想(通过操纵未使用的列表)。

两个列表都受互斥体保护。由于未使用列表可能为空,我们需要一个条件变量(每当向其添加新的未使用值时发出信号)以便线程可以等待一个可用。接收到的列表同样需要一个条件变量,这样如果消费者(数据存储者)想要它们时它恰好是空的,它可以等待至少一个出现。

static pthread_mutex_t  unused_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t   unused_wait = PTHREAD_COND_INITIALIZER;
static struct value    *unused_list = NULL;

static pthread_mutex_t  received_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t   received_wait = PTHREAD_COND_INITIALIZER;
static struct value    *received_list = NULL;

对于未使用的列表,我们需要三个助手:一个从头开始创建新的未使用的价值项目(您最初调用它是为了为每个传感器创建两到三个价值项目,再加上一些),然后,如果你认为你需要它们(比如,如果你添加新的传感器 运行 时间):

int unused_create(void)
{
    struct value *v;

    v = malloc(sizeof *v);
    if (!v)
        return ENOMEM;

    v->from = NULL;

    pthread_mutex_lock(&unused_lock);
    v->next = unused_list;
    unused_list = v;
    pthread_cond_signal(&unused_wait);
    pthread_mutex_unlock(&unused_lock);

    return 0;
}

需要另外两个来获取有价值的项目 from/back 并将其放入列表:

struct value *unused_get(void)
{
    struct value *v;

    pthread_mutex_lock(&unused_lock);
    while (!unused_list)
        pthread_cond_wait(&unused_wait, &unused_lock);
    v = unused_list;
    unused_list = unused_list->next;
    pthread_mutex_unlock(&unused_lock);

    v->from = NULL;

    return v;
}

void unused_put(struct value *v)
{
    v->from = NULL;

    pthread_mutex_lock(&unused_lock);
    v->next = unused_list;
    unused_list = v;
    pthread_cond_signal(&unused_wait);
    pthread_mutex_unlock(&unused_lock);
}

上面的想法是,当 from 成员为 NULL 时,该项目未使用(因为它不是来自任何传感器)。从技术上讲,我们不需要在每个阶段都将它清除为 NULL,但我喜欢彻底:设置它并不是一个代价高昂的操作。

传感器访问生产商获取传感器读数,使用例如获取当前时间clock_gettime(CLOCK_REALTIME, &timespec),然后使用unused_get()获取一个新的未使用的物品。 (顺序很重要,因为如果没有免费物品,unused_get() 可能需要一些时间。)然后,他们填写字段,并调用以下 received_put() 将阅读添加到列表中:

void received_put(struct value *v)
{
    pthread_mutex_lock(&received_lock);
    v->next = received_list;
    received_list = v;
    pthread_mutex_signal(&received_wait);
    pthread_mutex_unlock(&received_lock);
}

只有一个线程定期收集所有接收到的传感器读数,并存储它们。它可以保留一组最近的读数,并定期发送这些读数。与其重复调用一些 received_get() 直到没有更多未处理的接收值,我们应该使用一个 return 包含它们的整个列表的函数:

struct value *received_getall(void)
{
    struct value *v;
    pthread_mutex_lock(&received_lock);
    while (!received_list)
        pthread_cond_wait(&received_wait, &received_lock);
    v = received_list;
    received_list = NULL;
    pthread_mutex_unlock(&received_lock);
    return v;
}

消费者线程,storing/sending总结和阅读,应该获取整个列表,然后一一处理。处理完每个项目后,应将它们添加到未使用列表中。换句话说,像

    struct value *all, v;

    while (1) {
        all = receive_getall();
        while (all) {
            v = all;
            all = all->next;
            v->next = NULL;

            /* Store/summarize value item v */

            unused_put(v);
        }
    }

如您所见,当消费者线程正在处理传感器值项时,传感器线程可以为下一轮添加新读数,只要有足够的空闲值项桶可供使用。

当然,您也可以在一次 malloc() 调用中分配很多值,但是您必须以某种方式记住每个值属于哪个值池才能释放它们。所以:

struct owner {
    size_t          size;    /* Number of value's */
    size_t          used;    /* Number of value's not freed yet */
    struct value    value[];
};
struct value {
    struct value   *next;  /* Forming a singly-linked list of data items */
    struct owner   *owner; /* Part of which value array, NULL if standalone */
    struct sensor  *from;  /* Identifies which sensor value this is */
    struct timespec when;  /* Time of sensor reading in UTC */
    double          value; /* Numerical value */
};

int unused_add_array(const size_t size)
{
    struct owner *o;
    struct value *v;
    size_t        i;

    o = malloc(sizeof (struct owner) + size * sizeof (struct value));
    if (!o)
        return ENOMEM;

    o->size = size;
    o->used = used;
    i = size - 1;

    pthread_mutex_lock(&unused_lock);
    o->value[i].next = unused_list;
    while (i-->0)
       o->value[i].next = o->value + i + 1;    
    unused_list = o->value[0];
    pthread_cond_broadcast(&unused_wait);
    pthread_mutex_unlock(&unused_lock);
    return 0;
}

/* Instead of unused_put(), call unused_free() to discard a value */
void unused_free(struct value *v)
{
    pthread_mutex_lock(&unused_lock);
    v->from = NULL;

    if (v->owner) {
        if (v->owner->used > 1) {
            v->owner->used--;
            return;
        }
        v->owner->size = 0;
        v->owner->used = 0;
        free(v->owner);
        return;
    }
    free(v);
    return;
}

unused_free() 使用 unused_lock 的原因是我们必须确保在释放存储桶时没有其他线程正在访问它。否则,我们可以进行竞争 window,其他线程可能会在我们 free()d 之后使用该值。

请记住,与大多数其他 C 库一样,Linux C 库不会 return 在 free() 时为操作系统动态分配内存;内存只有在足够大的情况下才会被 returned。 (目前在 x86 和 x86-64 上,glibc 限制大约为 132,000 字节左右;任何更小的都留在进程堆中,用于满足未来的 malloc()/calloc()/realloc() 调用。)

struct sensor的内容由你决定,但我个人认为至少

struct sensor {
    pthread_t   worker;
    int         connfd;  /* Device or socket descriptor */
    const char *name;    /* Some kind of identifier, perhaps header in CSV */
    const char *units;   /* Optional, could be useful */
};

加上可能的传感器读取间隔(以毫秒为单位)。

实际上,因为只有一个消费者线程,所以我会使用主线程。