使用 std::async 并将向量中的参数传递给函数并收集结果

Using std::async and pass arguments in a vector to a function and gather results

我想让这个函数并行计算:

#include <iostream>
#include <vector>

int compute_something(int i, int j) {
    return i*j;
}

int main() {

    auto params = std::vector<int>(1000,5);
    std::vector<int> results;
    for (auto i : params)
        results.push_back(compute_something(i,4));

    return 0;
}

我将参数从列表传递给函数,并希望按顺序返回结果。我想使用异步并行化,因为 compute_something() 将花费不同的时间,具体取决于输入。而且输入向量远大于核数。

在 C++17 中,标准算法以并行版本提供。您指定 execution policystd::seq)、并行(std::par)或并行和矢量化(std::par_unseq),它将在后台为您执行多线程处理。

所以对于你想做的事情,你可以使用 std::transform 和 lambda 函数来捕获你想对输入向量的每个元素执行的操作,并将结果放在 results 向量(大小必须相同):

#include <execution>
#include <algorithm>
#include <vector>

int compute_something(int i, int j) {
    return i * j;
}

int main()
{
    auto params = std::vector<int>(1000, 5);
    std::vector<int> results(1000, 0);
    std::transform(std::execution::par_unseq, params.begin(), params.end(),
        results.begin(), [](int i) { return compute_something(i, 4); }
    );
}

当然,可以将计算嵌入到 lambda 中,以便像 compute_something 中那样进行简单的计算。那么代码就变成了:

std::transform(std::execution::par_unseq, params.begin(), params.end(),
        results.begin(), [](int i) { return i * 4; }

并不是所有的编译器都实现了执行策略。因此,如果您的编译器不支持它,您可以采用另一种方式:使用 std::async 并以块的形式处理输入向量。为此,您必须定义一个接受迭代器和 returns 结果向量的新函数。然后你可以在最后合并结果。

示例:

#include <future>
#include <vector>

using Iter = std::vector<int>::iterator;

std::vector<int> parallel_compute(Iter beg, Iter end)
{
    std::vector<int> results;
    //Reserve memory to avoid reallocations
    auto size = std::distance(beg, end);
    results.reserve(size);

    for (Iter it = beg; it != end; ++it)
    {
        results.push_back(*it * 4); //Add result to vector
    }

    return results;
}

int main()
{
    const int Size = 1000;
    //Chunk size
    const int Half = Size / 2;
    //Input vector
    auto params = std::vector<int>(Size, 5);
    //Create futures
    auto fut1 = std::async(std::launch::async, parallel_compute, params.begin(), params.begin()+ Half);
    auto fut2 = std::async(std::launch::async, parallel_compute, params.begin()+ Half, params.end());
    //Get results
    auto res1 = fut1.get();
    auto res2 = fut2.get();
    //Combine results into one vector
    std::vector<int> results;
    results.insert(results.end(), res1.begin(), res1.end());
    results.insert(results.end(), res2.begin(), res2.end());
}

launch::async 策略将确保创建两个线程。但是,我不会创建太多线程——每个内核一个线程是一种合理的策略。您可以使用 std::thread::hardware_concurrency() 来获取系统支持的并发线程数。创建线程并管理它们会引入一些开销,如果创建太多线程可能会适得其反。


编辑:

为了避免对单个小向量进行昂贵的分配,我们可以在开始时创建一个结果向量,并将迭代器传递给 parallel_compute 的每个并行调用的结果范围。由于每个线程将访问结果向量的不同部分,我们不需要同步:

#include <future>
#include <vector>

using Iter = std::vector<int>::iterator;

void parallel_compute(Iter beg, Iter end, Iter outBeg)
{
    for (Iter it = beg; it != end; ++it)
    {
        *outBeg++ = (*it * 4); //Add result to vector
    }
}

int main()
{
    const int Size = 1000;
    //Chunk size
    const int Half = Size / 2;
    //Input vector
    auto params = std::vector<int>(Size, 5);
    //Output vector
    std::vector<int> results(Size, 0);
    //Create futures
    auto fut1 = std::async(std::launch::async, parallel_compute, params.begin(), params.begin() + Half, results.begin());
    auto fut2 = std::async(std::launch::async, parallel_compute, params.begin() + Half, params.end(), results.begin() + Half);
    //Get results
    fut1.wait();
    fut2.wait();
}