组织多头数据记录
Organize multitrheading datalogging
我正在为嵌入式目标开发一个 C 项目 运行 Linux 发行版(使用 Yocto 构建)。我是 Linux 嵌入式世界的新手,我必须构建一个数据记录系统。
我已经在学习如何使用线程,我正在考虑项目组织。
这是我的想象:
- 多线程从不同的接口、CAN 总线、I2C...(不同的采样率)收集数据
- 一个采样率为 200 毫秒的线程来填充 csv 文件
- 一个采样率为3秒的线程用http请求发送数据
- 线程将在 CAN 信息或外部事件上停止
我不知道组织这个项目的最佳方式是什么。我看到了两种方法,第一种是启动程序创建每个线程并在 while 循环中等待事件监视以停止它们。第二种方式是启动程序将其他二进制文件作为线程执行。
在这两种方式中我不知道如何在线程之间共享数据。
可以分享一下你的经验吗?
谢谢
编辑:
首先,非常感谢@Glärbo 的解释。学习多线程机制真的很有帮助。
我测试成功了。
为了以后的读者,我绘制了图表来说明@Glärbo 的回答。
main thread
productor-sensor thread
datalogger thread
我会做得更简单,使用简单的多生产者、单一消费者方法。
假设每个数据项都可以用一个数值来描述:
struct value {
struct value *next; /* Forming a singly-linked list of data items */
struct sensor *from; /* Identifies which sensor value this is */
struct timespec when; /* Time of sensor reading in UTC */
double value; /* Numerical value */
};
我会使用两个值列表:一个用于接收但未存储的传感器读数,一个用于未使用的值桶。这样你就不需要动态分配或释放价值桶,除非你想(通过操纵未使用的列表)。
两个列表都受互斥体保护。由于未使用列表可能为空,我们需要一个条件变量(每当向其添加新的未使用值时发出信号)以便线程可以等待一个可用。接收到的列表同样需要一个条件变量,这样如果消费者(数据存储者)想要它们时它恰好是空的,它可以等待至少一个出现。
static pthread_mutex_t unused_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t unused_wait = PTHREAD_COND_INITIALIZER;
static struct value *unused_list = NULL;
static pthread_mutex_t received_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t received_wait = PTHREAD_COND_INITIALIZER;
static struct value *received_list = NULL;
对于未使用的列表,我们需要三个助手:一个从头开始创建新的未使用的价值项目(您最初调用它是为了为每个传感器创建两到三个价值项目,再加上一些),然后,如果你认为你需要它们(比如,如果你添加新的传感器 运行 时间):
int unused_create(void)
{
struct value *v;
v = malloc(sizeof *v);
if (!v)
return ENOMEM;
v->from = NULL;
pthread_mutex_lock(&unused_lock);
v->next = unused_list;
unused_list = v;
pthread_cond_signal(&unused_wait);
pthread_mutex_unlock(&unused_lock);
return 0;
}
需要另外两个来获取有价值的项目 from/back 并将其放入列表:
struct value *unused_get(void)
{
struct value *v;
pthread_mutex_lock(&unused_lock);
while (!unused_list)
pthread_cond_wait(&unused_wait, &unused_lock);
v = unused_list;
unused_list = unused_list->next;
pthread_mutex_unlock(&unused_lock);
v->from = NULL;
return v;
}
void unused_put(struct value *v)
{
v->from = NULL;
pthread_mutex_lock(&unused_lock);
v->next = unused_list;
unused_list = v;
pthread_cond_signal(&unused_wait);
pthread_mutex_unlock(&unused_lock);
}
上面的想法是,当 from
成员为 NULL 时,该项目未使用(因为它不是来自任何传感器)。从技术上讲,我们不需要在每个阶段都将它清除为 NULL,但我喜欢彻底:设置它并不是一个代价高昂的操作。
传感器访问生产商获取传感器读数,使用例如获取当前时间clock_gettime(CLOCK_REALTIME, ×pec)
,然后使用unused_get()
获取一个新的未使用的物品。 (顺序很重要,因为如果没有免费物品,unused_get()
可能需要一些时间。)然后,他们填写字段,并调用以下 received_put()
将阅读添加到列表中:
void received_put(struct value *v)
{
pthread_mutex_lock(&received_lock);
v->next = received_list;
received_list = v;
pthread_mutex_signal(&received_wait);
pthread_mutex_unlock(&received_lock);
}
只有一个线程定期收集所有接收到的传感器读数,并存储它们。它可以保留一组最近的读数,并定期发送这些读数。与其重复调用一些 received_get()
直到没有更多未处理的接收值,我们应该使用一个 return 包含它们的整个列表的函数:
struct value *received_getall(void)
{
struct value *v;
pthread_mutex_lock(&received_lock);
while (!received_list)
pthread_cond_wait(&received_wait, &received_lock);
v = received_list;
received_list = NULL;
pthread_mutex_unlock(&received_lock);
return v;
}
消费者线程,storing/sending总结和阅读,应该获取整个列表,然后一一处理。处理完每个项目后,应将它们添加到未使用列表中。换句话说,像
struct value *all, v;
while (1) {
all = receive_getall();
while (all) {
v = all;
all = all->next;
v->next = NULL;
/* Store/summarize value item v */
unused_put(v);
}
}
如您所见,当消费者线程正在处理传感器值项时,传感器线程可以为下一轮添加新读数,只要有足够的空闲值项桶可供使用。
当然,您也可以在一次 malloc() 调用中分配很多值,但是您必须以某种方式记住每个值属于哪个值池才能释放它们。所以:
struct owner {
size_t size; /* Number of value's */
size_t used; /* Number of value's not freed yet */
struct value value[];
};
struct value {
struct value *next; /* Forming a singly-linked list of data items */
struct owner *owner; /* Part of which value array, NULL if standalone */
struct sensor *from; /* Identifies which sensor value this is */
struct timespec when; /* Time of sensor reading in UTC */
double value; /* Numerical value */
};
int unused_add_array(const size_t size)
{
struct owner *o;
struct value *v;
size_t i;
o = malloc(sizeof (struct owner) + size * sizeof (struct value));
if (!o)
return ENOMEM;
o->size = size;
o->used = used;
i = size - 1;
pthread_mutex_lock(&unused_lock);
o->value[i].next = unused_list;
while (i-->0)
o->value[i].next = o->value + i + 1;
unused_list = o->value[0];
pthread_cond_broadcast(&unused_wait);
pthread_mutex_unlock(&unused_lock);
return 0;
}
/* Instead of unused_put(), call unused_free() to discard a value */
void unused_free(struct value *v)
{
pthread_mutex_lock(&unused_lock);
v->from = NULL;
if (v->owner) {
if (v->owner->used > 1) {
v->owner->used--;
return;
}
v->owner->size = 0;
v->owner->used = 0;
free(v->owner);
return;
}
free(v);
return;
}
unused_free()
使用 unused_lock
的原因是我们必须确保在释放存储桶时没有其他线程正在访问它。否则,我们可以进行竞争 window,其他线程可能会在我们 free()d 之后使用该值。
请记住,与大多数其他 C 库一样,Linux C 库不会 return 在 free() 时为操作系统动态分配内存;内存只有在足够大的情况下才会被 returned。 (目前在 x86 和 x86-64 上,glibc 限制大约为 132,000 字节左右;任何更小的都留在进程堆中,用于满足未来的 malloc()/calloc()/realloc() 调用。)
struct sensor
的内容由你决定,但我个人认为至少
struct sensor {
pthread_t worker;
int connfd; /* Device or socket descriptor */
const char *name; /* Some kind of identifier, perhaps header in CSV */
const char *units; /* Optional, could be useful */
};
加上可能的传感器读取间隔(以毫秒为单位)。
实际上,因为只有一个消费者线程,所以我会使用主线程。
我正在为嵌入式目标开发一个 C 项目 运行 Linux 发行版(使用 Yocto 构建)。我是 Linux 嵌入式世界的新手,我必须构建一个数据记录系统。
我已经在学习如何使用线程,我正在考虑项目组织。
这是我的想象:
- 多线程从不同的接口、CAN 总线、I2C...(不同的采样率)收集数据
- 一个采样率为 200 毫秒的线程来填充 csv 文件
- 一个采样率为3秒的线程用http请求发送数据
- 线程将在 CAN 信息或外部事件上停止
我不知道组织这个项目的最佳方式是什么。我看到了两种方法,第一种是启动程序创建每个线程并在 while 循环中等待事件监视以停止它们。第二种方式是启动程序将其他二进制文件作为线程执行。 在这两种方式中我不知道如何在线程之间共享数据。
可以分享一下你的经验吗?
谢谢
编辑: 首先,非常感谢@Glärbo 的解释。学习多线程机制真的很有帮助。
我测试成功了。
为了以后的读者,我绘制了图表来说明@Glärbo 的回答。
main thread
productor-sensor thread
datalogger thread
我会做得更简单,使用简单的多生产者、单一消费者方法。
假设每个数据项都可以用一个数值来描述:
struct value {
struct value *next; /* Forming a singly-linked list of data items */
struct sensor *from; /* Identifies which sensor value this is */
struct timespec when; /* Time of sensor reading in UTC */
double value; /* Numerical value */
};
我会使用两个值列表:一个用于接收但未存储的传感器读数,一个用于未使用的值桶。这样你就不需要动态分配或释放价值桶,除非你想(通过操纵未使用的列表)。
两个列表都受互斥体保护。由于未使用列表可能为空,我们需要一个条件变量(每当向其添加新的未使用值时发出信号)以便线程可以等待一个可用。接收到的列表同样需要一个条件变量,这样如果消费者(数据存储者)想要它们时它恰好是空的,它可以等待至少一个出现。
static pthread_mutex_t unused_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t unused_wait = PTHREAD_COND_INITIALIZER;
static struct value *unused_list = NULL;
static pthread_mutex_t received_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t received_wait = PTHREAD_COND_INITIALIZER;
static struct value *received_list = NULL;
对于未使用的列表,我们需要三个助手:一个从头开始创建新的未使用的价值项目(您最初调用它是为了为每个传感器创建两到三个价值项目,再加上一些),然后,如果你认为你需要它们(比如,如果你添加新的传感器 运行 时间):
int unused_create(void)
{
struct value *v;
v = malloc(sizeof *v);
if (!v)
return ENOMEM;
v->from = NULL;
pthread_mutex_lock(&unused_lock);
v->next = unused_list;
unused_list = v;
pthread_cond_signal(&unused_wait);
pthread_mutex_unlock(&unused_lock);
return 0;
}
需要另外两个来获取有价值的项目 from/back 并将其放入列表:
struct value *unused_get(void)
{
struct value *v;
pthread_mutex_lock(&unused_lock);
while (!unused_list)
pthread_cond_wait(&unused_wait, &unused_lock);
v = unused_list;
unused_list = unused_list->next;
pthread_mutex_unlock(&unused_lock);
v->from = NULL;
return v;
}
void unused_put(struct value *v)
{
v->from = NULL;
pthread_mutex_lock(&unused_lock);
v->next = unused_list;
unused_list = v;
pthread_cond_signal(&unused_wait);
pthread_mutex_unlock(&unused_lock);
}
上面的想法是,当 from
成员为 NULL 时,该项目未使用(因为它不是来自任何传感器)。从技术上讲,我们不需要在每个阶段都将它清除为 NULL,但我喜欢彻底:设置它并不是一个代价高昂的操作。
传感器访问生产商获取传感器读数,使用例如获取当前时间clock_gettime(CLOCK_REALTIME, ×pec)
,然后使用unused_get()
获取一个新的未使用的物品。 (顺序很重要,因为如果没有免费物品,unused_get()
可能需要一些时间。)然后,他们填写字段,并调用以下 received_put()
将阅读添加到列表中:
void received_put(struct value *v)
{
pthread_mutex_lock(&received_lock);
v->next = received_list;
received_list = v;
pthread_mutex_signal(&received_wait);
pthread_mutex_unlock(&received_lock);
}
只有一个线程定期收集所有接收到的传感器读数,并存储它们。它可以保留一组最近的读数,并定期发送这些读数。与其重复调用一些 received_get()
直到没有更多未处理的接收值,我们应该使用一个 return 包含它们的整个列表的函数:
struct value *received_getall(void)
{
struct value *v;
pthread_mutex_lock(&received_lock);
while (!received_list)
pthread_cond_wait(&received_wait, &received_lock);
v = received_list;
received_list = NULL;
pthread_mutex_unlock(&received_lock);
return v;
}
消费者线程,storing/sending总结和阅读,应该获取整个列表,然后一一处理。处理完每个项目后,应将它们添加到未使用列表中。换句话说,像
struct value *all, v;
while (1) {
all = receive_getall();
while (all) {
v = all;
all = all->next;
v->next = NULL;
/* Store/summarize value item v */
unused_put(v);
}
}
如您所见,当消费者线程正在处理传感器值项时,传感器线程可以为下一轮添加新读数,只要有足够的空闲值项桶可供使用。
当然,您也可以在一次 malloc() 调用中分配很多值,但是您必须以某种方式记住每个值属于哪个值池才能释放它们。所以:
struct owner {
size_t size; /* Number of value's */
size_t used; /* Number of value's not freed yet */
struct value value[];
};
struct value {
struct value *next; /* Forming a singly-linked list of data items */
struct owner *owner; /* Part of which value array, NULL if standalone */
struct sensor *from; /* Identifies which sensor value this is */
struct timespec when; /* Time of sensor reading in UTC */
double value; /* Numerical value */
};
int unused_add_array(const size_t size)
{
struct owner *o;
struct value *v;
size_t i;
o = malloc(sizeof (struct owner) + size * sizeof (struct value));
if (!o)
return ENOMEM;
o->size = size;
o->used = used;
i = size - 1;
pthread_mutex_lock(&unused_lock);
o->value[i].next = unused_list;
while (i-->0)
o->value[i].next = o->value + i + 1;
unused_list = o->value[0];
pthread_cond_broadcast(&unused_wait);
pthread_mutex_unlock(&unused_lock);
return 0;
}
/* Instead of unused_put(), call unused_free() to discard a value */
void unused_free(struct value *v)
{
pthread_mutex_lock(&unused_lock);
v->from = NULL;
if (v->owner) {
if (v->owner->used > 1) {
v->owner->used--;
return;
}
v->owner->size = 0;
v->owner->used = 0;
free(v->owner);
return;
}
free(v);
return;
}
unused_free()
使用 unused_lock
的原因是我们必须确保在释放存储桶时没有其他线程正在访问它。否则,我们可以进行竞争 window,其他线程可能会在我们 free()d 之后使用该值。
请记住,与大多数其他 C 库一样,Linux C 库不会 return 在 free() 时为操作系统动态分配内存;内存只有在足够大的情况下才会被 returned。 (目前在 x86 和 x86-64 上,glibc 限制大约为 132,000 字节左右;任何更小的都留在进程堆中,用于满足未来的 malloc()/calloc()/realloc() 调用。)
struct sensor
的内容由你决定,但我个人认为至少
struct sensor {
pthread_t worker;
int connfd; /* Device or socket descriptor */
const char *name; /* Some kind of identifier, perhaps header in CSV */
const char *units; /* Optional, could be useful */
};
加上可能的传感器读取间隔(以毫秒为单位)。
实际上,因为只有一个消费者线程,所以我会使用主线程。