多源 BFS 多线程
Multi source BFS multithreading
我有一个用邻接矩阵表示的图 arr
。以及多个起始顶点的向量 source
。
我的想法是根据线程数将 source
向量分成 "equal" 部分(如果它没有平均分割,我将剩余部分添加到最后一部分)。并创建 运行 这个函数的线程。 bool used[]
是全局 array
我正在尝试获得(我认为它叫做)"liner" 缩放。我假设起始顶点的数量至少等于线程的数量。
如果我使用互斥量来同步线程,效率会很低。
如果我不这样做,一些顶点会被多次遍历。
问题是否有一种数据结构可以让我删除互斥体?
或其他实现此算法的方法?
mutex m;
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
// piece of the original source
{
queue<int> que;
for(auto i = 0; i < s.size(); ++i)
{
que.push(s[i]);
used[s[i]] = true;
}
while (!que.empty())
{
int curr = que.front();
que.pop();
cout << curr << " ";
for (auto i = 0; i < n; ++i)
{
lock_guard<mutex> guard(m);
if (arr[curr][i] == 1 && !used[i] && curr != i)
{
que.push(i);
used[i] = true;
}
}
}
}```
有了 atomic<bool>
我想你快到了。唯一缺少的是原子交换操作。它允许您将读取-修改-写入作为原子操作。对于bool
原子类型,通常有一个硬件支持它。
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
// piece of the original source
{
//used[i] initialized to 'false' for all i
queue<int> que;
for(auto i = 0; i < s.size(); ++i)
{
que.push(s[i]);
//we don't change used just yet!
}
while (!que.empty())
{
int curr = que.front();
que.pop();
bool alreadyUsed = used[curr].exchange(true);
if(alreadyUsed) {
continue; //some other thread already processing it
}
cout << curr << " ";
for (auto i = 0; i < n; ++i) {
if (arr[curr][i] == 1 && !used[i] && curr != i) {
que.push(i);
}
}
}
}
注意,有一个逻辑上的变化:当线程开始处理节点时,used[i]
设置为 true
,而不是当它被添加到队列时。
在第一次尝试处理节点时,当 used[i]
设置为 true 时,alreadyUsed
将保留先前的值 (false
),表明之前没有其他人开始处理该节点。在后续尝试处理该节点时,alreadyUsed
将已设置为 true
并且处理将跳过。
上述方法并不理想:一个节点在处理之前可能会被多次添加到队列中。根据图表的形状,这可能是问题,也可能不是问题。
如果这是一个问题 - 我建议使用三值 used
状态:not_visited
、queued
和 processed
.
static constexpr int not_visited = 0;
static constexpr int queued = 1;
static constexpr int processed = 2;
现在,每次我们尝试推送到 que
,每次我们尝试处理节点时,我们都会相应地推进状态。这些改进需要通过 compare_exchange_strong
以原子方式执行,以便每个更改可能只发生一次。调用 compare_exchange_strong
returns true
如果成功(即先前包含的值实际上与 expected
匹配)
void msBFS(bool** arr, int n, vector<int> s, atomic<int>* used) //s is a different
// piece of the original source
{
//used[i] initialized to '0' for all i
queue<int> que;
int empty = 0;
for(auto i = 0; i < s.size(); ++i) {
int expected = not_visited;
//we check it even here, because one thread may be seriously lagging behind others which already entered the while loop
if(used[s[i]].compare_exchange_strong(expected, queued)) {
que.push(s[i]);
}
}
while (!que.empty())
{
int curr = que.front();
que.pop();
int expected = queued;
if(!used[curr].compare_exchange_strong(expected, processed)) {
continue;
}
cout << curr << " ";
for (auto i = 0; i < n; ++i) {
if (arr[curr][i] == 1 && curr != i) {
int expected = not_visited;
if(used[i].compare_exchange_strong(expected, queued)) {
que.push(i);
}
}
}
}
}
检查性能。有许多原子操作,但它们通常比互斥锁便宜。在内部,mutex 也执行类似于这些的原子操作,但除此之外它可能会完全阻塞一个线程。我展示的代码从不阻塞(线程永远不会停止),所有同步仅在原子变量上完成。
编辑: 第二种方法的一些可能的优化:
- 我意识到,如果转换
not_visited->queued
保证恰好发生一次,则甚至不必执行另一个转换,因为无论如何节点在一个队列中恰好出现一次。因此,您可以节省一些原子操作并再次使用 bool
。不过,由于这种情况很少见,我认为它不会产生太大影响。
- 迭代邻居时 - 有一个 if 语句:
if (arr[curr][i] == 1 && curr != i)
- 这不会检查邻居是否被访问过。它仅在稍后通过原子交换进行检查。但是,您可能想看看在 if
范围内检查是否有帮助。如果您及早发现 used[i]
已经在队列中或已处理,则跳过该分支,并跳过不再需要的原子比较和交换。
- 如果您想压缩处理器的每个滴答声,请考虑对邻接矩阵和
used
数组使用位域,而不是布尔值。我认为对邻居的迭代和条件可以用按位运算来评估,一次 32 bits/neighbors。甚至还有 std::atomic<uint32_t>::fetch_or
可以帮助您一次更新 32 used
。
Edit2: 第一种方法的可能优化:
每个线程都可以拥有自己的 localUsed
数组,在推送到队列时将对其进行检查并设置为 true
(与您在原始代码中所做的类似)。这对于线程来说是本地的,因此没有原子、互斥等。通过这个简单的检查,您可以保证给定节点最多出现在每个线程的队列中一次。因此,一个节点最多会出现 N 次,其中 N 是线程数。
我认为这是在可伸缩性和内存占用之间值得考虑的折衷,并且可能比第二种方法表现更好。
我有一个用邻接矩阵表示的图 arr
。以及多个起始顶点的向量 source
。
我的想法是根据线程数将 source
向量分成 "equal" 部分(如果它没有平均分割,我将剩余部分添加到最后一部分)。并创建 运行 这个函数的线程。 bool used[]
是全局 array
我正在尝试获得(我认为它叫做)"liner" 缩放。我假设起始顶点的数量至少等于线程的数量。
如果我使用互斥量来同步线程,效率会很低。 如果我不这样做,一些顶点会被多次遍历。 问题是否有一种数据结构可以让我删除互斥体? 或其他实现此算法的方法?
mutex m;
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
// piece of the original source
{
queue<int> que;
for(auto i = 0; i < s.size(); ++i)
{
que.push(s[i]);
used[s[i]] = true;
}
while (!que.empty())
{
int curr = que.front();
que.pop();
cout << curr << " ";
for (auto i = 0; i < n; ++i)
{
lock_guard<mutex> guard(m);
if (arr[curr][i] == 1 && !used[i] && curr != i)
{
que.push(i);
used[i] = true;
}
}
}
}```
有了 atomic<bool>
我想你快到了。唯一缺少的是原子交换操作。它允许您将读取-修改-写入作为原子操作。对于bool
原子类型,通常有一个硬件支持它。
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
// piece of the original source
{
//used[i] initialized to 'false' for all i
queue<int> que;
for(auto i = 0; i < s.size(); ++i)
{
que.push(s[i]);
//we don't change used just yet!
}
while (!que.empty())
{
int curr = que.front();
que.pop();
bool alreadyUsed = used[curr].exchange(true);
if(alreadyUsed) {
continue; //some other thread already processing it
}
cout << curr << " ";
for (auto i = 0; i < n; ++i) {
if (arr[curr][i] == 1 && !used[i] && curr != i) {
que.push(i);
}
}
}
}
注意,有一个逻辑上的变化:当线程开始处理节点时,used[i]
设置为 true
,而不是当它被添加到队列时。
在第一次尝试处理节点时,当 used[i]
设置为 true 时,alreadyUsed
将保留先前的值 (false
),表明之前没有其他人开始处理该节点。在后续尝试处理该节点时,alreadyUsed
将已设置为 true
并且处理将跳过。
上述方法并不理想:一个节点在处理之前可能会被多次添加到队列中。根据图表的形状,这可能是问题,也可能不是问题。
如果这是一个问题 - 我建议使用三值 used
状态:not_visited
、queued
和 processed
.
static constexpr int not_visited = 0;
static constexpr int queued = 1;
static constexpr int processed = 2;
现在,每次我们尝试推送到 que
,每次我们尝试处理节点时,我们都会相应地推进状态。这些改进需要通过 compare_exchange_strong
以原子方式执行,以便每个更改可能只发生一次。调用 compare_exchange_strong
returns true
如果成功(即先前包含的值实际上与 expected
匹配)
void msBFS(bool** arr, int n, vector<int> s, atomic<int>* used) //s is a different
// piece of the original source
{
//used[i] initialized to '0' for all i
queue<int> que;
int empty = 0;
for(auto i = 0; i < s.size(); ++i) {
int expected = not_visited;
//we check it even here, because one thread may be seriously lagging behind others which already entered the while loop
if(used[s[i]].compare_exchange_strong(expected, queued)) {
que.push(s[i]);
}
}
while (!que.empty())
{
int curr = que.front();
que.pop();
int expected = queued;
if(!used[curr].compare_exchange_strong(expected, processed)) {
continue;
}
cout << curr << " ";
for (auto i = 0; i < n; ++i) {
if (arr[curr][i] == 1 && curr != i) {
int expected = not_visited;
if(used[i].compare_exchange_strong(expected, queued)) {
que.push(i);
}
}
}
}
}
检查性能。有许多原子操作,但它们通常比互斥锁便宜。在内部,mutex 也执行类似于这些的原子操作,但除此之外它可能会完全阻塞一个线程。我展示的代码从不阻塞(线程永远不会停止),所有同步仅在原子变量上完成。
编辑: 第二种方法的一些可能的优化:
- 我意识到,如果转换
not_visited->queued
保证恰好发生一次,则甚至不必执行另一个转换,因为无论如何节点在一个队列中恰好出现一次。因此,您可以节省一些原子操作并再次使用bool
。不过,由于这种情况很少见,我认为它不会产生太大影响。 - 迭代邻居时 - 有一个 if 语句:
if (arr[curr][i] == 1 && curr != i)
- 这不会检查邻居是否被访问过。它仅在稍后通过原子交换进行检查。但是,您可能想看看在if
范围内检查是否有帮助。如果您及早发现used[i]
已经在队列中或已处理,则跳过该分支,并跳过不再需要的原子比较和交换。 - 如果您想压缩处理器的每个滴答声,请考虑对邻接矩阵和
used
数组使用位域,而不是布尔值。我认为对邻居的迭代和条件可以用按位运算来评估,一次 32 bits/neighbors。甚至还有std::atomic<uint32_t>::fetch_or
可以帮助您一次更新 32used
。
Edit2: 第一种方法的可能优化:
每个线程都可以拥有自己的 localUsed
数组,在推送到队列时将对其进行检查并设置为 true
(与您在原始代码中所做的类似)。这对于线程来说是本地的,因此没有原子、互斥等。通过这个简单的检查,您可以保证给定节点最多出现在每个线程的队列中一次。因此,一个节点最多会出现 N 次,其中 N 是线程数。
我认为这是在可伸缩性和内存占用之间值得考虑的折衷,并且可能比第二种方法表现更好。