多源 BFS 多线程

Multi source BFS multithreading

我有一个用邻接矩阵表示的图 arr。以及多个起始顶点的向量 source

我的想法是根据线程数将 source 向量分成 "equal" 部分(如果它没有平均分割,我将剩余部分添加到最后一部分)。并创建 运行 这个函数的线程。 bool used[] 是全局 array

我正在尝试获得(我认为它叫做)"liner" 缩放。我假设起始顶点的数量至少等于线程的数量。

如果我使用互斥量来同步线程,效率会很低。 如果我不这样做,一些顶点会被多次遍历。 问题是否有一种数据结构可以让我删除互斥体? 或其他实现此算法的方法?

mutex m;
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
                                                   // piece of the original source 
{
    queue<int> que;
    for(auto i  = 0; i < s.size(); ++i)
    {
        que.push(s[i]);
        used[s[i]] = true; 
    }
    while (!que.empty())
    {
        int curr = que.front();
        que.pop();
        cout << curr << " ";
        for (auto i = 0; i < n; ++i)
        {
            lock_guard<mutex> guard(m);
            if (arr[curr][i] == 1 && !used[i] && curr != i)
            {
                que.push(i);
                used[i] = true;
            }
        }
    }
}```

有了 atomic<bool> 我想你快到了。唯一缺少的是原子交换操作。它允许您将读取-修改-写入作为原子操作。对于bool原子类型,通常有一个硬件支持它。

void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
                                                   // piece of the original source 
{
    //used[i] initialized to 'false' for all i
    queue<int> que;
    for(auto i  = 0; i < s.size(); ++i)
    {
        que.push(s[i]);
        //we don't change used just yet!
    }
    while (!que.empty())
    {
        int curr = que.front();
        que.pop();
        bool alreadyUsed = used[curr].exchange(true);
        if(alreadyUsed) {
            continue; //some other thread already processing it
        }
        cout << curr << " ";
        for (auto i = 0; i < n; ++i) {
            if (arr[curr][i] == 1 && !used[i] && curr != i) {
                que.push(i);
            }
        }
    }
}

注意,有一个逻辑上的变化:当线程开始处理节点时,used[i] 设置为 true,而不是当它被添加到队列时。 在第一次尝试处理节点时,当 used[i] 设置为 true 时,alreadyUsed 将保留先前的值 (false),表明之前没有其他人开始处理该节点。在后续尝试处理该节点时,alreadyUsed 将已设置为 true 并且处理将跳过。

上述方法并不理想:一个节点在处理之前可能会被多次添加到队列中。根据图表的形状,这可能是问题,也可能不是问题。

如果这是一个问题 - 我建议使用三值 used 状态:not_visitedqueuedprocessed.

static constexpr int not_visited = 0;
static constexpr int queued = 1;
static constexpr int processed = 2;

现在,每次我们尝试推送到 que,每次我们尝试处理节点时,我们都会相应地推进状态。这些改进需要通过 compare_exchange_strong 以原子方式执行,以便每个更改可能只发生一次。调用 compare_exchange_strong returns true 如果成功(即先前包含的值实际上与 expected 匹配)

void msBFS(bool** arr, int n, vector<int> s, atomic<int>* used) //s is a different
                                                   // piece of the original source 
{

    //used[i] initialized to '0' for all i
    queue<int> que;
    int empty = 0;
    for(auto i = 0; i < s.size(); ++i) {
        int expected = not_visited;
        //we check it even here, because one thread may be seriously lagging behind others which already entered the while loop
        if(used[s[i]].compare_exchange_strong(expected, queued)) {
            que.push(s[i]);
        }
    }
    while (!que.empty())
    {
        int curr = que.front();
        que.pop();
        int expected = queued;
        if(!used[curr].compare_exchange_strong(expected, processed)) {
            continue;
        }
        cout << curr << " ";
        for (auto i = 0; i < n; ++i) {
            if (arr[curr][i] == 1 && curr != i) {
                int expected = not_visited;
                if(used[i].compare_exchange_strong(expected, queued)) {
                    que.push(i);
                }                    
            }
        }
    }
}

检查性能。有许多原子操作,但它们通常比互斥锁便宜。在内部,mutex 也执行类似于这些的原子操作,但除此之外它可能会完全阻塞一个线程。我展示的代码从不阻塞(线程永远不会停止),所有同步仅在原子变量上完成。


编辑: 第二种方法的一些可能的优化:

  • 我意识到,如果转换 not_visited->queued 保证恰好发生一次,则甚至不必执行另一个转换,因为无论如何节点在一个队列中恰好出现一次。因此,您可以节省一些原子操作并再次使用 bool。不过,由于这种情况很少见,我认为它不会产生太大影响。
  • 迭代邻居时 - 有一个 if 语句:if (arr[curr][i] == 1 && curr != i) - 这不会检查邻居是否被访问过。它仅在稍后通过原子交换进行检查。但是,您可能想看看在 if 范围内检查是否有帮助。如果您及早发现 used[i] 已经在队列中或已处理,则跳过该分支,并跳过不再需要的原子比较和交换。
  • 如果您想压缩处理器的每个滴答声,请考虑对邻接矩阵和 used 数组使用位域,而不是布尔值。我认为对邻居的迭代和条件可以用按位运算来评估,一次 32 bits/neighbors。甚至还有 std::atomic<uint32_t>::fetch_or 可以帮助您一次更新 32 used

Edit2: 第一种方法的可能优化:

每个线程都可以拥有自己的 localUsed 数组,在推送到队列时将对其进行检查并设置为 true(与您在原始代码中所做的类似)。这对于线程来说是本地的,因此没有原子、互斥等。通过这个简单的检查,您可以保证给定节点最多出现在每个线程的队列中一次。因此,一个节点最多会出现 N 次,其中 N 是线程数。 我认为这是在可伸缩性和内存占用之间值得考虑的折衷,并且可能比第二种方法表现更好。