无法理解或修复程序中的竞争条件
Unable to understand or fix race condition in my program
我正在实现我自己的 malloc 版本,它与 glibc malloc 非常相似,因为它通过创建 arenas 来支持多线程,这是一个线程可以在没有竞争风险的情况下工作的内存区域另一个线程。
我的数据结构如下:
typedef struct s_arena {
pthread_mutex_t mutex;
t_pool *main_pool;
} t_arena;
typedef struct s_arena_data {
_Atomic int arena_count;
t_arena arenas[M_ARENA_MAX];
} t_arena_data;
t_arena_data 是一个全局变量,包含已创建的竞技场数量,第一次调用从 0 开始,到 M_ARENA_MAX 结束(我目前定义为 8),并且一个包含我所有竞技场的数组。
一个竞技场只包含一个用 pthread_mutex_init() 初始化的互斥锁和一个指向内存池的指针。内存池对于本主题并不重要,因为竞争条件会在到达之前发生。
我的程序是如何工作的:当每个线程进入 malloc 时,它会尝试 pthread_try_lock 第一个区域的互斥体。如果是这样,一切都很好,它会继续进行我在这里没有描述的分配。如果没有,可能会发生几件事。
如果数组中的下一个条目为空且未达到 M_ARENA_MAX,则将锁定新的互斥锁以创建新的竞技场并将其添加到数组中。互斥锁是全局的,这意味着没有两个线程可以同时创建一个竞技场。
如果该互斥体被锁定,则线程将循环回到 arena[0] 并继续搜索打开的互斥体。
现在,我很确定竞争条件的发生是因为变量 arena_count。由于调试 printf 语句,我观察到任何时候函数段错误 M_ARENA_MAX 都没有达到。如果有,程序将不会崩溃。所以我怀疑一个线程可能在另一个线程递增它之前读取 arena_count 的值,并且当它完成读取它时,递增它的线程释放 new_arena_mutex 和第一个线程创建了一个索引错误的竞技场。
这是我的第一个多线程程序,所以如果我的解释或代码不清楚,我深表歉意,但我已经花了最后 4 个小时解决这个问题,虽然我认为我缩小了问题的范围,但我真的不知道不知道怎么解决。
这是代码中有问题的部分:
current_arena = &arena_data.arenas[0];
int arena_index = 0;
while (pthread_mutex_trylock(¤t_arena->mutex) != 0) {
printf("THREAD %p READS[0] ARENA COUNT AT %d\n", (void *)pthread_self(), arena_data.arena_count);
if (arena_index == arena_data.arena_count - 1) {
printf("THREAD %p READS[1] ARENA COUNT AT %d\n", (void *)pthread_self(), arena_data.arena_count);
if (pthread_mutex_trylock(&new_arena_mutex) != 0 || arena_data.arena_count == M_ARENA_MAX) {
current_arena = &arena_data.arenas[(arena_index = 0)];
continue;
}
creator = true;
break;
}
current_arena = &arena_data.arenas[arena_index++];
}
/* All arenas are occupied by other threads but M_ARENA_MAX isn't reached. Let's just create a new one. */
if (creator == true) {
printf("THREAD %p READS[2] ARENA COUNT AT %d\n", (void *)pthread_self(), arena_data.arena_count);
current_pool = create_new_pool(MAIN_POOL, chunk_type, size, pagesize, &new_arena_mutex);
if (current_pool == MAP_FAILED) return NULL;
++arena_data.arena_count;
arena_data.arenas[arena_index + 1] = (t_arena){ .main_pool = current_pool };
pthread_mutex_init(&arena_data.arenas[arena_index + 1].mutex, NULL);
pthread_mutex_lock(&arena_data.arenas[arena_index + 1].mutex);
pthread_mutex_unlock(&new_arena_mutex);
return user_area((t_alloc_chunk *)current_pool->chunk, size, &arena_data.arenas[arena_index + 1].mutex);
}
这是 printf 语句之一,它证实了我的存在竞争条件的理论:
THREAD 0x7f9c3b216700 READS[1] ARENA COUNT AT 4
THREAD 0x7f9c3b216700 READS[2] ARENA COUNT AT 5
值应该相等但实际上不相等。
我可以在您的代码中发现三个问题。
1。两个线程创建竞技场时的竞争条件
这是您在问题中描述的竞争条件:
So I am suspecting that one thread might be reading the value of arena_count just before an other thread is incrementing it, and by the time it finishes reading it, the thread that incremented it releases the new_arena_mutex and the first thread goes in creating an arena with a wrong index.
是的,这是可能发生的。来自 arena_data.arena_count
的 load 以原子方式发生,但线程通常可能不会假定该值(仍然)正确。修改后的版本 未 解决问题。
为了解决这个问题,以下保证可能会有所帮助:任何 store 到 arena_data.arena_count
发生在持有 new_arena_mutex
时。结果,持有互斥锁的线程可以安全地加载 arena_data.arena_count
(当然,同时持有互斥锁),并且可以确保在解锁互斥锁之前它的值不会改变。让我尝试通过更改和评论您的更新代码来解释:
while (pthread_mutex_trylock(¤t_arena->mutex) != 0) {
if (arena_index == arena_data.arena_count - 1) {
// This thread checks the condition above _without_ holding the
// `new_arena_mutex`. Another thread may hold the mutex (and hence it
// may increment `arena_count`).
if (pthread_mutex_trylock(&new_arena_mutex) == 0) {
// Now, this thread can assume that no other thread writes to
// `arena_data.arena_count`. However, the condition
//
// arena_index == arena_data.arena_count - 1
//
// may no longer be true (because it had been checked before locking).
if (arena_data.arena_count < M_ARENA_MAX) {
// This thread may create a new arena at index
// `arena_data.arena_count`. That is safe because this thread holds
// the `new_arena_mutex` (preventing other threads from modifying
// `arena_count`.
//
// However, it is possible that `arena_index` is not at the position
// of the most recently created arena (checked _before_ locking). Let
// us just assume that all the recently created arenas are still
// locked. Hence we just skip the check and directly jump to the most
// recently created arena (as if we failed locking).
arena_index = arena_data.arena_count - 1;
current_arena = &arena_data.arenas[arena_index];
++arena_data.arena_count;
assert(
arena_index + 1 == arena_data.arena_count &&
"... and this thread is holding the mutex, so it stays true."
);
creator = true;
break;
} else {
pthread_mutex_unlock(&new_arena_mutex);
}
在我看来,如果将这些操作提取到诸如
的函数中,代码将变得更具可读性
// both functions return `arena_index` or `-1`
int try_find_and_lock_arena();
int try_create_and_lock_arena();
2。可疑(错误?)post-自增运算符
下一行中的 post-increment 运算符在我看来是错误的:
current_arena = &arena_data.arenas[arena_index++];// post-increment
// now, `&arena_data.arenas[arena_index]` is one beyond `current_arena`.
写成两行,可能更容易推理行为:
assert(
current_arena == &arena_data.arenas[arena_index] &&
"this is an invariant I expect to hold"
);
current_arena = &arena_data.arenas[arena_index];// this is a no-op...
arena_index++;// ... and now, they are out of sync
assert(
current_arena == &arena_data.arenas[arena_index] &&
"now, the invariant is broken (and this assert should fire)"
);
3。互斥 lock/unlock 对
的可读性
我发现很难为所有可能的路径匹配互斥锁的 lock/unlock 操作,因为它们发生在不同的范围内。
// [new_arena_mutex is locked]
current_pool = create_new_pool(/* ... */, &new_arena_mutex);
if (current_pool == MAP_FAILED) return NULL;// error-path return
// `create_new_pool` unlocks iff it returns `MAP_FAILED`...
/* ... */
pthread_mutex_unlock(&new_arena_mutex);
// ... otherwise, the mutex is unlocked here
return user_area(/* ... */);
(编辑):它没有。
这似乎解决了问题:
/* Look for an open arena. */
current_arena = &arena_data.arenas[0];
int arena_index = 0;
while (pthread_mutex_trylock(¤t_arena->mutex) != 0) {
if (arena_index == arena_data.arena_count - 1) {
if (pthread_mutex_trylock(&new_arena_mutex) == 0) {
if (arena_data.arena_count < M_ARENA_MAX) {
++arena_data.arena_count;
creator = true;
break;
} else {
pthread_mutex_unlock(&new_arena_mutex);
}
}
current_arena = &arena_data.arenas[(arena_index = 0)];
continue;
}
current_arena = &arena_data.arenas[arena_index++];
}
/* All arenas are occupied by other threads but M_ARENA_MAX isn't reached. Let's just create a new one. */
if (creator == true) {
current_pool = create_new_pool(MAIN_POOL, chunk_type, size, pagesize, &new_arena_mutex);
if (current_pool == MAP_FAILED) return NULL;
arena_data.arenas[arena_index + 1] = (t_arena){ .main_pool = current_pool };
pthread_mutex_init(&arena_data.arenas[arena_index + 1].mutex, NULL);
pthread_mutex_lock(&arena_data.arenas[arena_index + 1].mutex);
pthread_mutex_unlock(&new_arena_mutex);
return user_area((t_alloc_chunk *)current_pool->chunk, size, &arena_data.arenas[arena_index + 1].mutex);
}
我正在实现我自己的 malloc 版本,它与 glibc malloc 非常相似,因为它通过创建 arenas 来支持多线程,这是一个线程可以在没有竞争风险的情况下工作的内存区域另一个线程。
我的数据结构如下:
typedef struct s_arena {
pthread_mutex_t mutex;
t_pool *main_pool;
} t_arena;
typedef struct s_arena_data {
_Atomic int arena_count;
t_arena arenas[M_ARENA_MAX];
} t_arena_data;
t_arena_data 是一个全局变量,包含已创建的竞技场数量,第一次调用从 0 开始,到 M_ARENA_MAX 结束(我目前定义为 8),并且一个包含我所有竞技场的数组。
一个竞技场只包含一个用 pthread_mutex_init() 初始化的互斥锁和一个指向内存池的指针。内存池对于本主题并不重要,因为竞争条件会在到达之前发生。
我的程序是如何工作的:当每个线程进入 malloc 时,它会尝试 pthread_try_lock 第一个区域的互斥体。如果是这样,一切都很好,它会继续进行我在这里没有描述的分配。如果没有,可能会发生几件事。
如果数组中的下一个条目为空且未达到 M_ARENA_MAX,则将锁定新的互斥锁以创建新的竞技场并将其添加到数组中。互斥锁是全局的,这意味着没有两个线程可以同时创建一个竞技场。
如果该互斥体被锁定,则线程将循环回到 arena[0] 并继续搜索打开的互斥体。
现在,我很确定竞争条件的发生是因为变量 arena_count。由于调试 printf 语句,我观察到任何时候函数段错误 M_ARENA_MAX 都没有达到。如果有,程序将不会崩溃。所以我怀疑一个线程可能在另一个线程递增它之前读取 arena_count 的值,并且当它完成读取它时,递增它的线程释放 new_arena_mutex 和第一个线程创建了一个索引错误的竞技场。
这是我的第一个多线程程序,所以如果我的解释或代码不清楚,我深表歉意,但我已经花了最后 4 个小时解决这个问题,虽然我认为我缩小了问题的范围,但我真的不知道不知道怎么解决。
这是代码中有问题的部分:
current_arena = &arena_data.arenas[0];
int arena_index = 0;
while (pthread_mutex_trylock(¤t_arena->mutex) != 0) {
printf("THREAD %p READS[0] ARENA COUNT AT %d\n", (void *)pthread_self(), arena_data.arena_count);
if (arena_index == arena_data.arena_count - 1) {
printf("THREAD %p READS[1] ARENA COUNT AT %d\n", (void *)pthread_self(), arena_data.arena_count);
if (pthread_mutex_trylock(&new_arena_mutex) != 0 || arena_data.arena_count == M_ARENA_MAX) {
current_arena = &arena_data.arenas[(arena_index = 0)];
continue;
}
creator = true;
break;
}
current_arena = &arena_data.arenas[arena_index++];
}
/* All arenas are occupied by other threads but M_ARENA_MAX isn't reached. Let's just create a new one. */
if (creator == true) {
printf("THREAD %p READS[2] ARENA COUNT AT %d\n", (void *)pthread_self(), arena_data.arena_count);
current_pool = create_new_pool(MAIN_POOL, chunk_type, size, pagesize, &new_arena_mutex);
if (current_pool == MAP_FAILED) return NULL;
++arena_data.arena_count;
arena_data.arenas[arena_index + 1] = (t_arena){ .main_pool = current_pool };
pthread_mutex_init(&arena_data.arenas[arena_index + 1].mutex, NULL);
pthread_mutex_lock(&arena_data.arenas[arena_index + 1].mutex);
pthread_mutex_unlock(&new_arena_mutex);
return user_area((t_alloc_chunk *)current_pool->chunk, size, &arena_data.arenas[arena_index + 1].mutex);
}
这是 printf 语句之一,它证实了我的存在竞争条件的理论:
THREAD 0x7f9c3b216700 READS[1] ARENA COUNT AT 4
THREAD 0x7f9c3b216700 READS[2] ARENA COUNT AT 5
值应该相等但实际上不相等。
我可以在您的代码中发现三个问题。
1。两个线程创建竞技场时的竞争条件
这是您在问题中描述的竞争条件:
So I am suspecting that one thread might be reading the value of arena_count just before an other thread is incrementing it, and by the time it finishes reading it, the thread that incremented it releases the new_arena_mutex and the first thread goes in creating an arena with a wrong index.
是的,这是可能发生的。来自 arena_data.arena_count
的 load 以原子方式发生,但线程通常可能不会假定该值(仍然)正确。修改后的版本
为了解决这个问题,以下保证可能会有所帮助:任何 store 到 arena_data.arena_count
发生在持有 new_arena_mutex
时。结果,持有互斥锁的线程可以安全地加载 arena_data.arena_count
(当然,同时持有互斥锁),并且可以确保在解锁互斥锁之前它的值不会改变。让我尝试通过更改和评论您的更新代码来解释:
while (pthread_mutex_trylock(¤t_arena->mutex) != 0) {
if (arena_index == arena_data.arena_count - 1) {
// This thread checks the condition above _without_ holding the
// `new_arena_mutex`. Another thread may hold the mutex (and hence it
// may increment `arena_count`).
if (pthread_mutex_trylock(&new_arena_mutex) == 0) {
// Now, this thread can assume that no other thread writes to
// `arena_data.arena_count`. However, the condition
//
// arena_index == arena_data.arena_count - 1
//
// may no longer be true (because it had been checked before locking).
if (arena_data.arena_count < M_ARENA_MAX) {
// This thread may create a new arena at index
// `arena_data.arena_count`. That is safe because this thread holds
// the `new_arena_mutex` (preventing other threads from modifying
// `arena_count`.
//
// However, it is possible that `arena_index` is not at the position
// of the most recently created arena (checked _before_ locking). Let
// us just assume that all the recently created arenas are still
// locked. Hence we just skip the check and directly jump to the most
// recently created arena (as if we failed locking).
arena_index = arena_data.arena_count - 1;
current_arena = &arena_data.arenas[arena_index];
++arena_data.arena_count;
assert(
arena_index + 1 == arena_data.arena_count &&
"... and this thread is holding the mutex, so it stays true."
);
creator = true;
break;
} else {
pthread_mutex_unlock(&new_arena_mutex);
}
在我看来,如果将这些操作提取到诸如
的函数中,代码将变得更具可读性// both functions return `arena_index` or `-1`
int try_find_and_lock_arena();
int try_create_and_lock_arena();
2。可疑(错误?)post-自增运算符
下一行中的 post-increment 运算符在我看来是错误的:
current_arena = &arena_data.arenas[arena_index++];// post-increment
// now, `&arena_data.arenas[arena_index]` is one beyond `current_arena`.
写成两行,可能更容易推理行为:
assert(
current_arena == &arena_data.arenas[arena_index] &&
"this is an invariant I expect to hold"
);
current_arena = &arena_data.arenas[arena_index];// this is a no-op...
arena_index++;// ... and now, they are out of sync
assert(
current_arena == &arena_data.arenas[arena_index] &&
"now, the invariant is broken (and this assert should fire)"
);
3。互斥 lock/unlock 对
的可读性我发现很难为所有可能的路径匹配互斥锁的 lock/unlock 操作,因为它们发生在不同的范围内。
// [new_arena_mutex is locked]
current_pool = create_new_pool(/* ... */, &new_arena_mutex);
if (current_pool == MAP_FAILED) return NULL;// error-path return
// `create_new_pool` unlocks iff it returns `MAP_FAILED`...
/* ... */
pthread_mutex_unlock(&new_arena_mutex);
// ... otherwise, the mutex is unlocked here
return user_area(/* ... */);
(编辑):它没有。
这似乎解决了问题:
/* Look for an open arena. */
current_arena = &arena_data.arenas[0];
int arena_index = 0;
while (pthread_mutex_trylock(¤t_arena->mutex) != 0) {
if (arena_index == arena_data.arena_count - 1) {
if (pthread_mutex_trylock(&new_arena_mutex) == 0) {
if (arena_data.arena_count < M_ARENA_MAX) {
++arena_data.arena_count;
creator = true;
break;
} else {
pthread_mutex_unlock(&new_arena_mutex);
}
}
current_arena = &arena_data.arenas[(arena_index = 0)];
continue;
}
current_arena = &arena_data.arenas[arena_index++];
}
/* All arenas are occupied by other threads but M_ARENA_MAX isn't reached. Let's just create a new one. */
if (creator == true) {
current_pool = create_new_pool(MAIN_POOL, chunk_type, size, pagesize, &new_arena_mutex);
if (current_pool == MAP_FAILED) return NULL;
arena_data.arenas[arena_index + 1] = (t_arena){ .main_pool = current_pool };
pthread_mutex_init(&arena_data.arenas[arena_index + 1].mutex, NULL);
pthread_mutex_lock(&arena_data.arenas[arena_index + 1].mutex);
pthread_mutex_unlock(&new_arena_mutex);
return user_area((t_alloc_chunk *)current_pool->chunk, size, &arena_data.arenas[arena_index + 1].mutex);
}