并行计算大向量的总和
Calculating the sum of a large vector in parallel
问题背景
我有一个程序目前需要很长时间才能使用 std::accumulate
汇总约 1 亿个元素的大 std::vector
,这是一个瓶颈。
我希望它更快,我希望它是一个异步计算,这样 GUI/Server 就不会阻塞。计算也应该使用 多线程 这样我就可以减少求和矢量所需的时间。
我想拆分求和,让每个线程对向量的一部分求和,然后在计算所有部分和时,将每个线程的部分和加在一起得到总和。
Boost.Asio?
我想知道如何在 Boost.Asio 中解决这个问题?我的程序理想情况下需要重用线程(如 线程组 ),不确定如何存储和检索部分和并最终检索部分和的总和。
我正在考虑创建一个调用 boost::asio::io_service::run
的线程组,传递一个处理程序来计算部分总和,但我不确定如何将部分总和传递给另一个处理程序并将所有部分总和相加求和。
如果有人向我展示了一些我可以如何处理的框架代码,那就太好了。
您可以使用 Boost Asio 作为线程池。但它没有太多意义,除非你有...异步 IO 操作来协调。
在这个对“c++ work queues with blocking”的回答中,我展示了两个 thread_pool
实现:
- 解决方案 #1:一个基于
boost::asio::io_service
- 解决方案 #2:另一个基于
boost::thread
原语
两者都接受任何 void()
签名兼容任务。这意味着,您可以将 returns 重要结果的函数包装在 packaged_task<...>
中,并从中获取 future<RetVal>
。
Boost.Asio适合这个问题吗?
Boost.Asio的主要目的是为网络和I/O编程提供异步模型,而且您描述的问题似乎与网络和 I/O.
没有太大关系
我认为最简单的解决方案是使用 Boost 或 C++ 标准库提供的 线程原语。
并行算法
下面是仅使用标准库创建的 accumulate
并行版本的示例。
/* Minimum number of elements for multithreaded algorithm.
Less than this and the algorithm is executed on single thread. */
static const int MT_MIN_SIZE = 10000;
template <typename InputIt, typename T>
auto parallel_accumulate(InputIt first, InputIt last, T init) {
// Determine total size.
const auto size = std::distance(first, last);
// Determine how many parts the work shall be split into.
const auto parts = (size < MT_MIN_SIZE)? 1 : std::thread::hardware_concurrency();
std::vector<std::future<T>> futures;
// For each part, calculate size and run accumulate on a separate thread.
for (std::size_t i = 0; i != parts; ++i) {
const auto part_size = (size * i + size) / parts - (size * i) / parts;
futures.emplace_back(std::async(std::launch::async,
[=] { return std::accumulate(first, std::next(first, part_size), T{}); }));
std::advance(first, part_size);
}
// Wait for all threads to finish execution and accumulate results.
return std::accumulate(std::begin(futures), std::end(futures), init,
[] (const T prev, auto& future) { return prev + future.get(); });
}
Live example (并行版本在 Coliru 上的性能与顺序版本大致相同,可能只有 1 个内核可用)
时间安排
在我的机器上(使用 8 个线程),并行版本的性能平均提高了 ~120%。
Sequential sum:
Time taken: 46 ms
5000000050000000
--------------------------------
Parallel sum:
Time taken: 21 ms
5000000050000000
然而,100,000,000 个元素的绝对增益只是微不足道的(25 毫秒)。虽然,当累积不同的元素类型时,性能增益可能比 int
.
更大
OpenMP
正如@sehe 在评论中提到的,值得一提的是 OpenMP 可能会提供一个简单的解决方案来解决这个问题,例如
template <typename T, typename U>
auto omp_accumulate(const std::vector<T>& v, U init) {
U sum = init;
#pragma omp parallel for reduction(+:sum)
for(std::size_t i = 0; i < v.size(); i++) {
sum += v[i];
}
return sum;
}
在我的机器上,此方法与使用标准线程原语的并行方法执行相同。
Sequential sum:
Time taken: 46 ms
5000000050000000
--------------------------------
Parallel sum:
Time taken: 21 ms
Sum: 5000000050000000
--------------------------------
OpenMP sum:
Time taken: 21 ms
Sum: 5000000050000000
问题背景
我有一个程序目前需要很长时间才能使用 std::accumulate
汇总约 1 亿个元素的大 std::vector
,这是一个瓶颈。
我希望它更快,我希望它是一个异步计算,这样 GUI/Server 就不会阻塞。计算也应该使用 多线程 这样我就可以减少求和矢量所需的时间。
我想拆分求和,让每个线程对向量的一部分求和,然后在计算所有部分和时,将每个线程的部分和加在一起得到总和。
Boost.Asio?
我想知道如何在 Boost.Asio 中解决这个问题?我的程序理想情况下需要重用线程(如 线程组 ),不确定如何存储和检索部分和并最终检索部分和的总和。
我正在考虑创建一个调用 boost::asio::io_service::run
的线程组,传递一个处理程序来计算部分总和,但我不确定如何将部分总和传递给另一个处理程序并将所有部分总和相加求和。
如果有人向我展示了一些我可以如何处理的框架代码,那就太好了。
您可以使用 Boost Asio 作为线程池。但它没有太多意义,除非你有...异步 IO 操作来协调。
在这个对“c++ work queues with blocking”的回答中,我展示了两个 thread_pool
实现:
- 解决方案 #1:一个基于
boost::asio::io_service
- 解决方案 #2:另一个基于
boost::thread
原语
两者都接受任何 void()
签名兼容任务。这意味着,您可以将 returns 重要结果的函数包装在 packaged_task<...>
中,并从中获取 future<RetVal>
。
Boost.Asio适合这个问题吗?
Boost.Asio的主要目的是为网络和I/O编程提供异步模型,而且您描述的问题似乎与网络和 I/O.
没有太大关系我认为最简单的解决方案是使用 Boost 或 C++ 标准库提供的 线程原语。
并行算法
下面是仅使用标准库创建的 accumulate
并行版本的示例。
/* Minimum number of elements for multithreaded algorithm.
Less than this and the algorithm is executed on single thread. */
static const int MT_MIN_SIZE = 10000;
template <typename InputIt, typename T>
auto parallel_accumulate(InputIt first, InputIt last, T init) {
// Determine total size.
const auto size = std::distance(first, last);
// Determine how many parts the work shall be split into.
const auto parts = (size < MT_MIN_SIZE)? 1 : std::thread::hardware_concurrency();
std::vector<std::future<T>> futures;
// For each part, calculate size and run accumulate on a separate thread.
for (std::size_t i = 0; i != parts; ++i) {
const auto part_size = (size * i + size) / parts - (size * i) / parts;
futures.emplace_back(std::async(std::launch::async,
[=] { return std::accumulate(first, std::next(first, part_size), T{}); }));
std::advance(first, part_size);
}
// Wait for all threads to finish execution and accumulate results.
return std::accumulate(std::begin(futures), std::end(futures), init,
[] (const T prev, auto& future) { return prev + future.get(); });
}
Live example (并行版本在 Coliru 上的性能与顺序版本大致相同,可能只有 1 个内核可用)
时间安排
在我的机器上(使用 8 个线程),并行版本的性能平均提高了 ~120%。
Sequential sum:
Time taken: 46 ms
5000000050000000
--------------------------------
Parallel sum:
Time taken: 21 ms
5000000050000000
然而,100,000,000 个元素的绝对增益只是微不足道的(25 毫秒)。虽然,当累积不同的元素类型时,性能增益可能比 int
.
OpenMP
正如@sehe 在评论中提到的,值得一提的是 OpenMP 可能会提供一个简单的解决方案来解决这个问题,例如
template <typename T, typename U>
auto omp_accumulate(const std::vector<T>& v, U init) {
U sum = init;
#pragma omp parallel for reduction(+:sum)
for(std::size_t i = 0; i < v.size(); i++) {
sum += v[i];
}
return sum;
}
在我的机器上,此方法与使用标准线程原语的并行方法执行相同。
Sequential sum:
Time taken: 46 ms
5000000050000000
--------------------------------
Parallel sum:
Time taken: 21 ms
Sum: 5000000050000000
--------------------------------
OpenMP sum:
Time taken: 21 ms
Sum: 5000000050000000