并行快速处理 std::bitset<65536>
Fast process std::bitset<65536> in parallel
曾经有一个被删除的question,我写了一个巨大的答案,但是这个问题被删除了,作者拒绝取消删除。
所以在这里发布这个问题的简短摘要。并立即亲自回答这个问题,只是为了分享我的结果。
问题是,如果我们给定 std::bitset<65536>
在内循环中逐位处理(通过某些公式),那么我们如何提高计算量?
外层循环只是调用了内层循环很多次(比如说50 000次),外层循环不能并行处理,因为每次下一次迭代都取决于上一次迭代的结果。
这个过程的示例代码:
std::bitset<65536> bits{};
uint64_t hash = 0;
for (size_t i = 0; i < 50000; ++i) {
// Process Bits
for (size_t j = 0; j < bits.size(); ++j)
bits[j] = ModifyBit(i, j, hash, bits[j]);
hash = Hash(bits, hash);
}
以上代码只是一种示例处理方式,并非真实案例。实际情况是这样的,很多时候我们以某种方式处理 std::bitset<65536>
,所有位都可以独立处理。
问题是我们如何在内循环中尽可能快地并行处理位。
一个重要的注意修改位的公式是通用的,这意味着我们事先不知道它并且不能从中做出SIMD指令.
但我们知道所有位都可以独立处理。而且我们需要并行处理这个过程。我们也不能并行化外部循环,因为它的每次迭代都取决于前一次迭代的结果。
另一个注意是std::bitset<65536>很小,只有1K的64位字。因此,这意味着直接使用 std::thread of std::async 线程的池将不起作用,因为每个线程的工作将只有 50-200 纳秒左右,启动和停止线程以及将工作发送到的时间非常短他们。即使 std::mutex 在我的 Windows 机器上也需要 75 纳秒(尽管在 Linux 上需要 20 纳秒),所以使用 std::mutex 也是一个很大的开销。
人们可能会假设上面的 ModifyBit()
函数对每个位花费大约相同的时间,否则就无法理解如何安排循环的平衡并行化,只能将其切成许多小任务,希望较长的任务将被几个较短的任务所平衡。
为您的任务实施了相当庞大和复杂的解决方案,但运行速度非常快。在我的 4 核(8 个硬件线程)笔记本电脑上,与单线程版本(您的代码版本)相比,我有 6x
倍 multi-core 的加速。
以下解决方案的主要思想是为 运行ning 具有小开销的任意任务实现非常快速的多核 Thread-Pool。我的实现每秒最多可以处理 1-1000 万个任务(取决于 CPU 速度和内核数)。
异步启动多个任务的常规方法是使用 std::async or just by creating std::thread。这两种方式都比我自己的实现慢得多。他们不能像我的实现那样提供每秒 500 万个任务的吞吐量。并且您的代码需要每秒执行数百万个任务才能达到 运行 以获得良好的速度。这就是为什么我从头开始实施所有内容的原因。
现在实现快速线程池后,我们可以将您的 64K 位集分割成更小的 sub-sets 并并行处理这些 sub-sets。我把64K的bitset分成了16等份(见代码中的BitSize / 16
),你可以设置其他的份量等于2的幂,但不要太多,否则线程池开销会太大。通常最好分割成等于硬件线程数量两倍(或内核数量的 4 倍)的部分数量。
我用 C++ 代码实现了几个 classes。 AtomicMutex
class 使用 std::atomic_flag 来快速替换基于 spin-locking 方法的互斥体。此 AtomicMutex 用于保护为线程池上的 运行ning 提交的任务队列。
RingBuffer
class基于std::vector实现了简单快速的队列来存储任何对象。它是使用两个指向向量的指针(头和尾)实现的。当新元素添加到队列时,尾指针向右前进,如果该指针到达向量的末尾,则它环绕到第 0 个位置。同样,当元素从队列中取出时,头指针也向右前进并环绕。 RingBuffer用于存放线程池任务
Queue
class 是 RingBuffer 的包装器,但具有 AtomicMutex 保护。此 spin-lock 互斥体用于保护同步 adding/taking 元素 to/from 队列免受多个工作线程的影响。
Pool
实现 multi-core 任务池本身。它创建的工作线程数与 CPU 硬件线程数(内核数量的两倍)减去一个数一样多。每个工作线程只是从队列中轮询新任务并立即执行它们。主线程将新任务添加到队列中。 Pool 也有等待所有当前任务完成的 Wait() 能力,这个等待被用作等待整个 64K 位集被处理(所有 sub-parts 被处理)的屏障。 Pool 接受任何 lambda(函数闭包)为 运行。你可以看到 64K 位集被分割成更小的部分是通过 pool.Emplace(lambda)
处理的,后来 pool.Wait()
用于等待所有 sub-parts 完成。如果有任何错误,池工作人员的异常将被收集并报告给用户。在主线程内执行 Wait() 池 运行s 任务时,不要浪费一个核心来等待任务完成。
控制台中报告的时间由 std::chrono 模块完成。
可以 运行 两个版本 - single-threaded(您的原始版本)和 multi-threaded 使用所有内核。 single/multi 之间的切换是通过将 MultiThreaded = true
模板参数传递给函数 ProcessBitset()
.
来完成的
#include <cstdint>
#include <atomic>
#include <vector>
#include <array>
#include <queue>
#include <functional>
#include <thread>
#include <future>
#include <exception>
#include <optional>
#include <memory>
#include <iostream>
#include <iomanip>
#include <bitset>
#include <string>
#include <chrono>
#include <algorithm>
#include <any>
#include <type_traits>
class AtomicMutex {
class LockerC;
public:
void lock() {
while (f_.test_and_set(std::memory_order_acquire))
//f_.wait(true, std::memory_order_acquire)
;
}
void unlock() {
f_.clear(std::memory_order_release);
//f_.notify_all();
}
LockerC Locker() { return LockerC(*this); }
private:
class LockerC {
public:
LockerC() = delete;
LockerC(AtomicMutex & mux) : pmux_(&mux) { mux.lock(); }
LockerC(LockerC const & other) = delete;
LockerC(LockerC && other) : pmux_(other.pmux_) { other.pmux_ = nullptr; }
~LockerC() { if (pmux_) pmux_->unlock(); }
LockerC & operator = (LockerC const & other) = delete;
LockerC & operator = (LockerC && other) = delete;
private:
AtomicMutex * pmux_ = nullptr;
};
std::atomic_flag f_ = ATOMIC_FLAG_INIT;
};
template <typename T>
class RingBuffer {
public:
RingBuffer() : buf_(1 << 8), last_(buf_.size() - 1) {}
T & front() { return buf_[first_]; }
T const & front() const { return buf_[first_]; }
T & back() { return buf_[last_]; }
T const & back() const { return buf_[last_]; }
size_t size() const { return size_; }
bool empty() const { return size_ == 0; }
template <typename ... Args>
void emplace(Args && ... args) {
while (size_ >= buf_.size()) {
std::rotate(&buf_[0], &buf_[first_], &buf_[buf_.size()]);
first_ = 0;
last_ = buf_.size() - 1;
buf_.resize(buf_.size() * 2);
}
++size_;
++last_;
if (last_ >= buf_.size())
last_ = 0;
buf_[last_] = T(std::forward<Args>(args)...);
}
void pop() {
if (size_ == 0)
return;
--size_;
++first_;
if (first_ >= buf_.size())
first_ = 0;
}
private:
std::vector<T> buf_;
size_t first_ = 0, last_ = 0, size_ = 0;
};
template <typename T>
class Queue {
public:
size_t Size() const { return q_.size(); }
bool Empty() const { return q_.size() == 0; }
template <typename ... Args>
void Emplace(Args && ... args) {
auto lock = m_.Locker();
q_.emplace(std::forward<Args>(args)...);
}
T Pop(std::function<void()> const & on_empty = []{},
std::function<void()> const & on_full = []{}) {
while (true) {
if (q_.empty()) {
on_empty();
continue;
}
auto lock = m_.Locker();
if (q_.empty()) {
on_empty();
continue;
}
on_full();
T val = std::move(q_.front());
q_.pop();
return std::move(val);
}
}
std::optional<T> TryPop() {
auto lock = m_.Locker();
if (q_.empty())
return std::nullopt;
T val = std::move(q_.front());
q_.pop();
return std::move(val);
}
private:
AtomicMutex m_;
RingBuffer<T> q_;
};
class RunInDestr {
public:
RunInDestr(std::function<void()> const & f) : f_(f) {}
~RunInDestr() { f_(); }
private:
std::function<void()> const & f_;
};
class Pool {
public:
struct FinishExc {};
struct Worker {
std::unique_ptr<std::atomic<bool>> pdone = std::make_unique<std::atomic<bool>>(true);
std::unique_ptr<std::exception_ptr> pexc = std::make_unique<std::exception_ptr>();
std::unique_ptr<std::thread> thr;
};
Pool(size_t nthreads = size_t(-1)) {
if (nthreads == size_t(-1))
nthreads = std::thread::hardware_concurrency() - 1;
std::cout << "Pool has " << nthreads << " worker threads." << std::endl;
for (size_t i = 0; i < nthreads; ++i) {
workers_.emplace_back(Worker{});
workers_.back().thr = std::make_unique<std::thread>(
[&, pdone = workers_.back().pdone.get(), pexc = workers_.back().pexc.get()]{
try {
std::function<void()> f_done = [pdone]{
pdone->store(true, std::memory_order_relaxed);
}, f_empty = [this]{
CheckFinish();
}, f_full = [pdone]{
pdone->store(false, std::memory_order_relaxed);
};
while (true) {
RunInDestr set_done(f_done);
tasks_.Pop(f_empty, f_full)();
}
} catch (...) {
exc_.store(true, std::memory_order_relaxed);
*pexc = std::current_exception();
}
});
}
}
~Pool() {
Wait();
Finish();
}
void CheckExc() {
if (!exc_.load(std::memory_order_relaxed))
return;
Finish();
throw std::runtime_error("Pool: Exception occured!");
}
void Finish() {
finish_ = true;
for (auto & w: workers_)
try {
w.thr->join();
if (*w.pexc)
std::rethrow_exception(*w.pexc);
} catch (FinishExc const &) {}
workers_.clear();
}
template <typename ... Args>
void Emplace(Args && ... args) {
CheckExc();
tasks_.Emplace(std::forward<Args>(args)...);
}
void Wait() {
while (true) {
auto task = tasks_.TryPop();
if (!task)
break;
(*task)();
}
while (true) {
bool done = true;
for (auto & w: workers_)
if (!w.pdone->load(std::memory_order_relaxed)) {
done = false;
break;
}
if (done)
break;
}
CheckExc();
}
private:
void CheckFinish() {
if (finish_)
throw FinishExc{};
}
Queue<std::function<void()>> tasks_;
std::vector<Worker> workers_;
bool finish_ = false;
std::atomic<bool> exc_ = false;
};
template <bool MultiThreaded = true, size_t BitSize>
void ProcessBitset(Pool & pool, std::bitset<BitSize> & bset,
std::string const & businessLogicCriteria) {
static size_t constexpr block = BitSize / 16;
for (int j = 0; j < BitSize; j += block) {
auto task = [&bset, j]{
int const hi = std::min(j + block, BitSize);
for (int i = j; i < hi; ++i) {
if (i % 2 == 0)
bset[i] = 0;
else
bset[i] = 1;
}
};
if constexpr(MultiThreaded)
pool.Emplace(std::move(task));
else
task();
}
if constexpr(MultiThreaded)
pool.Wait();
}
static auto const gtb = std::chrono::high_resolution_clock::now();
double Time() {
return std::chrono::duration_cast<std::chrono::duration<double>>(
std::chrono::high_resolution_clock::now() - gtb).count();
}
void Compute() {
Pool pool;
std::bitset<65536> bset;
std::string businessLogicCriteria;
int const hi = 50000;
for (int j = 0; j < hi; ++j) {
if ((j & 0x1FFF) == 0 || j + 1 >= hi)
std::cout << j / 1000 << "K (" << std::fixed << std::setprecision(3) << Time() << " sec), " << std::flush;
ProcessBitset(pool, bset, businessLogicCriteria);
businessLogicCriteria = "...";
}
}
void TimeMeasure() {
size_t constexpr A = 1 << 16, B = 1 << 5;
{
Pool pool;
auto const tb = Time();
int64_t volatile x = 0;
for (size_t i = 0; i < A; ++i) {
for (size_t j = 0; j < B; ++j)
pool.Emplace([&]{ x = x + 1; });
pool.Wait();
}
std::cout << "AtomicPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
<< " sec, speed " << A * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
<< 1'000'000 / (A * B / (Time() - tb)) << " sec/M-task, no-collisions "
<< std::setprecision(7) << double(x) / (A * B) << std::endl;
}
{
auto const tb = Time();
//size_t const nthr = std::thread::hardware_concurrency();
size_t constexpr C = A / 8;
std::vector<std::future<void>> asyncs;
int64_t volatile x = 0;
for (size_t i = 0; i < C; ++i) {
asyncs.clear();
for (size_t j = 0; j < B; ++j)
asyncs.emplace_back(std::async(std::launch::async, [&]{ x = x + 1; }));
asyncs.clear();
}
std::cout << "AsyncPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
<< " sec, speed " << C * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
<< 1'000'000 / (C * B / (Time() - tb)) << " sec/M-task, no-collisions "
<< std::setprecision(7) << double(x) / (C * B) << std::endl;
}
}
int main() {
try {
TimeMeasure();
Compute();
return 0;
} catch (std::exception const & ex) {
std::cout << "Exception: " << ex.what() << std::endl;
return -1;
} catch (...) {
std::cout << "Unknown Exception!" << std::endl;
return -1;
}
}
4 核(8 个硬件线程)的输出:
Pool has 7 worker threads.
AtomicPool time 0.903 sec, speed 2321.831 empty K-tasks/sec, 0.431 sec/M-task, no-collisions 0.9999967
AsyncPool time 0.982 sec, speed 266.789 empty K-tasks/sec, 3.750 sec/M-task, no-collisions 0.9999123
Pool has 7 worker threads.
0K (0.074 sec), 8K (0.670 sec), 16K (1.257 sec), 24K (1.852 sec), 32K (2.435 sec), 40K (2.984 sec), 49K (3.650 sec), 49K (3.711 sec),
下面的比较是 single-threaded 版本时间,即慢 6x
倍:
0K (0.125 sec), 8K (3.786 sec), 16K (7.754 sec), 24K (11.202 sec), 32K (14.662 sec), 40K (18.056 sec), 49K (21.470 sec), 49K (21.841 sec),
您有想要并行化的内部循环:
for (size_t j = 0; j < bits.size(); ++j)
bits[j] = ModifyBit(i, j, hash, bits[j]);
所以一个好主意是将它分成块,让多个线程并行处理每个块。您可以使用 std::atomic<int>
计数器轻松地将块提交给工作人员,该计数器递增以识别要处理的块。您还可以确保线程在一个循环后全部停止工作,然后再使用 std::barrier
:
开始下一个循环
std::bitset<65536> bits{};
std::thread pool[8]; // Change size accordingly
std::atomic<int> task_number{0};
constexpr std::size_t tasks_per_loop = 32; // Arbitrarily chosen
constexpr std::size_t block_size = (bits.size()+tasks_per_loop-1) / tasks_per_loop;
// (only written to by one thread by the barrier, so not atomic)
uint64_t hash = 0;
int i = 0;
std::barrier barrier(std::size(pool), [&]() {
task_number = 0;
++i;
hash = Hash(bits, hash);
});
for (std::thread& t : pool) {
t = std::thread([&]{
while (i < 50000) {
for (int t; (t = task_number++) < tasks_per_loop;) {
int block_start = t * block_size;
int block_end = std::min(block_start + block_size, bits.size());
for (int j = block_start; j < block_end; ++j) {
bits[j] = ModifyBit(i, j, hash, bits[j]);
}
}
// Wait for other threads to finish and hash
// to be calculated before starting next loop
barrier.arrive_and_wait();
}
});
}
for (std::thread& t : pool) t.join();
(使用 OpenMP #pragma omp parallel for
并行化 for 循环的看似简单的方法在一些测试中似乎较慢,可能是因为任务太小了)
这里是针对你的实现运行类似的代码:https://godbolt.org/z/en76Kv4nn
在我的机器上,运行 这几次 100 万次迭代用我的方法花费了 28 到 32 秒,而用你的一般线程池方法花费了 44 到 50 秒(当然这是不那么普遍的因为它不能执行任意 std::function<void()>
任务。
曾经有一个被删除的question,我写了一个巨大的答案,但是这个问题被删除了,作者拒绝取消删除。
所以在这里发布这个问题的简短摘要。并立即亲自回答这个问题,只是为了分享我的结果。
问题是,如果我们给定 std::bitset<65536>
在内循环中逐位处理(通过某些公式),那么我们如何提高计算量?
外层循环只是调用了内层循环很多次(比如说50 000次),外层循环不能并行处理,因为每次下一次迭代都取决于上一次迭代的结果。
这个过程的示例代码:
std::bitset<65536> bits{};
uint64_t hash = 0;
for (size_t i = 0; i < 50000; ++i) {
// Process Bits
for (size_t j = 0; j < bits.size(); ++j)
bits[j] = ModifyBit(i, j, hash, bits[j]);
hash = Hash(bits, hash);
}
以上代码只是一种示例处理方式,并非真实案例。实际情况是这样的,很多时候我们以某种方式处理 std::bitset<65536>
,所有位都可以独立处理。
问题是我们如何在内循环中尽可能快地并行处理位。
一个重要的注意修改位的公式是通用的,这意味着我们事先不知道它并且不能从中做出SIMD指令.
但我们知道所有位都可以独立处理。而且我们需要并行处理这个过程。我们也不能并行化外部循环,因为它的每次迭代都取决于前一次迭代的结果。
另一个注意是std::bitset<65536>很小,只有1K的64位字。因此,这意味着直接使用 std::thread of std::async 线程的池将不起作用,因为每个线程的工作将只有 50-200 纳秒左右,启动和停止线程以及将工作发送到的时间非常短他们。即使 std::mutex 在我的 Windows 机器上也需要 75 纳秒(尽管在 Linux 上需要 20 纳秒),所以使用 std::mutex 也是一个很大的开销。
人们可能会假设上面的 ModifyBit()
函数对每个位花费大约相同的时间,否则就无法理解如何安排循环的平衡并行化,只能将其切成许多小任务,希望较长的任务将被几个较短的任务所平衡。
为您的任务实施了相当庞大和复杂的解决方案,但运行速度非常快。在我的 4 核(8 个硬件线程)笔记本电脑上,与单线程版本(您的代码版本)相比,我有 6x
倍 multi-core 的加速。
以下解决方案的主要思想是为 运行ning 具有小开销的任意任务实现非常快速的多核 Thread-Pool。我的实现每秒最多可以处理 1-1000 万个任务(取决于 CPU 速度和内核数)。
异步启动多个任务的常规方法是使用 std::async or just by creating std::thread。这两种方式都比我自己的实现慢得多。他们不能像我的实现那样提供每秒 500 万个任务的吞吐量。并且您的代码需要每秒执行数百万个任务才能达到 运行 以获得良好的速度。这就是为什么我从头开始实施所有内容的原因。
现在实现快速线程池后,我们可以将您的 64K 位集分割成更小的 sub-sets 并并行处理这些 sub-sets。我把64K的bitset分成了16等份(见代码中的BitSize / 16
),你可以设置其他的份量等于2的幂,但不要太多,否则线程池开销会太大。通常最好分割成等于硬件线程数量两倍(或内核数量的 4 倍)的部分数量。
我用 C++ 代码实现了几个 classes。 AtomicMutex
class 使用 std::atomic_flag 来快速替换基于 spin-locking 方法的互斥体。此 AtomicMutex 用于保护为线程池上的 运行ning 提交的任务队列。
RingBuffer
class基于std::vector实现了简单快速的队列来存储任何对象。它是使用两个指向向量的指针(头和尾)实现的。当新元素添加到队列时,尾指针向右前进,如果该指针到达向量的末尾,则它环绕到第 0 个位置。同样,当元素从队列中取出时,头指针也向右前进并环绕。 RingBuffer用于存放线程池任务
Queue
class 是 RingBuffer 的包装器,但具有 AtomicMutex 保护。此 spin-lock 互斥体用于保护同步 adding/taking 元素 to/from 队列免受多个工作线程的影响。
Pool
实现 multi-core 任务池本身。它创建的工作线程数与 CPU 硬件线程数(内核数量的两倍)减去一个数一样多。每个工作线程只是从队列中轮询新任务并立即执行它们。主线程将新任务添加到队列中。 Pool 也有等待所有当前任务完成的 Wait() 能力,这个等待被用作等待整个 64K 位集被处理(所有 sub-parts 被处理)的屏障。 Pool 接受任何 lambda(函数闭包)为 运行。你可以看到 64K 位集被分割成更小的部分是通过 pool.Emplace(lambda)
处理的,后来 pool.Wait()
用于等待所有 sub-parts 完成。如果有任何错误,池工作人员的异常将被收集并报告给用户。在主线程内执行 Wait() 池 运行s 任务时,不要浪费一个核心来等待任务完成。
控制台中报告的时间由 std::chrono 模块完成。
可以 运行 两个版本 - single-threaded(您的原始版本)和 multi-threaded 使用所有内核。 single/multi 之间的切换是通过将 MultiThreaded = true
模板参数传递给函数 ProcessBitset()
.
#include <cstdint>
#include <atomic>
#include <vector>
#include <array>
#include <queue>
#include <functional>
#include <thread>
#include <future>
#include <exception>
#include <optional>
#include <memory>
#include <iostream>
#include <iomanip>
#include <bitset>
#include <string>
#include <chrono>
#include <algorithm>
#include <any>
#include <type_traits>
class AtomicMutex {
class LockerC;
public:
void lock() {
while (f_.test_and_set(std::memory_order_acquire))
//f_.wait(true, std::memory_order_acquire)
;
}
void unlock() {
f_.clear(std::memory_order_release);
//f_.notify_all();
}
LockerC Locker() { return LockerC(*this); }
private:
class LockerC {
public:
LockerC() = delete;
LockerC(AtomicMutex & mux) : pmux_(&mux) { mux.lock(); }
LockerC(LockerC const & other) = delete;
LockerC(LockerC && other) : pmux_(other.pmux_) { other.pmux_ = nullptr; }
~LockerC() { if (pmux_) pmux_->unlock(); }
LockerC & operator = (LockerC const & other) = delete;
LockerC & operator = (LockerC && other) = delete;
private:
AtomicMutex * pmux_ = nullptr;
};
std::atomic_flag f_ = ATOMIC_FLAG_INIT;
};
template <typename T>
class RingBuffer {
public:
RingBuffer() : buf_(1 << 8), last_(buf_.size() - 1) {}
T & front() { return buf_[first_]; }
T const & front() const { return buf_[first_]; }
T & back() { return buf_[last_]; }
T const & back() const { return buf_[last_]; }
size_t size() const { return size_; }
bool empty() const { return size_ == 0; }
template <typename ... Args>
void emplace(Args && ... args) {
while (size_ >= buf_.size()) {
std::rotate(&buf_[0], &buf_[first_], &buf_[buf_.size()]);
first_ = 0;
last_ = buf_.size() - 1;
buf_.resize(buf_.size() * 2);
}
++size_;
++last_;
if (last_ >= buf_.size())
last_ = 0;
buf_[last_] = T(std::forward<Args>(args)...);
}
void pop() {
if (size_ == 0)
return;
--size_;
++first_;
if (first_ >= buf_.size())
first_ = 0;
}
private:
std::vector<T> buf_;
size_t first_ = 0, last_ = 0, size_ = 0;
};
template <typename T>
class Queue {
public:
size_t Size() const { return q_.size(); }
bool Empty() const { return q_.size() == 0; }
template <typename ... Args>
void Emplace(Args && ... args) {
auto lock = m_.Locker();
q_.emplace(std::forward<Args>(args)...);
}
T Pop(std::function<void()> const & on_empty = []{},
std::function<void()> const & on_full = []{}) {
while (true) {
if (q_.empty()) {
on_empty();
continue;
}
auto lock = m_.Locker();
if (q_.empty()) {
on_empty();
continue;
}
on_full();
T val = std::move(q_.front());
q_.pop();
return std::move(val);
}
}
std::optional<T> TryPop() {
auto lock = m_.Locker();
if (q_.empty())
return std::nullopt;
T val = std::move(q_.front());
q_.pop();
return std::move(val);
}
private:
AtomicMutex m_;
RingBuffer<T> q_;
};
class RunInDestr {
public:
RunInDestr(std::function<void()> const & f) : f_(f) {}
~RunInDestr() { f_(); }
private:
std::function<void()> const & f_;
};
class Pool {
public:
struct FinishExc {};
struct Worker {
std::unique_ptr<std::atomic<bool>> pdone = std::make_unique<std::atomic<bool>>(true);
std::unique_ptr<std::exception_ptr> pexc = std::make_unique<std::exception_ptr>();
std::unique_ptr<std::thread> thr;
};
Pool(size_t nthreads = size_t(-1)) {
if (nthreads == size_t(-1))
nthreads = std::thread::hardware_concurrency() - 1;
std::cout << "Pool has " << nthreads << " worker threads." << std::endl;
for (size_t i = 0; i < nthreads; ++i) {
workers_.emplace_back(Worker{});
workers_.back().thr = std::make_unique<std::thread>(
[&, pdone = workers_.back().pdone.get(), pexc = workers_.back().pexc.get()]{
try {
std::function<void()> f_done = [pdone]{
pdone->store(true, std::memory_order_relaxed);
}, f_empty = [this]{
CheckFinish();
}, f_full = [pdone]{
pdone->store(false, std::memory_order_relaxed);
};
while (true) {
RunInDestr set_done(f_done);
tasks_.Pop(f_empty, f_full)();
}
} catch (...) {
exc_.store(true, std::memory_order_relaxed);
*pexc = std::current_exception();
}
});
}
}
~Pool() {
Wait();
Finish();
}
void CheckExc() {
if (!exc_.load(std::memory_order_relaxed))
return;
Finish();
throw std::runtime_error("Pool: Exception occured!");
}
void Finish() {
finish_ = true;
for (auto & w: workers_)
try {
w.thr->join();
if (*w.pexc)
std::rethrow_exception(*w.pexc);
} catch (FinishExc const &) {}
workers_.clear();
}
template <typename ... Args>
void Emplace(Args && ... args) {
CheckExc();
tasks_.Emplace(std::forward<Args>(args)...);
}
void Wait() {
while (true) {
auto task = tasks_.TryPop();
if (!task)
break;
(*task)();
}
while (true) {
bool done = true;
for (auto & w: workers_)
if (!w.pdone->load(std::memory_order_relaxed)) {
done = false;
break;
}
if (done)
break;
}
CheckExc();
}
private:
void CheckFinish() {
if (finish_)
throw FinishExc{};
}
Queue<std::function<void()>> tasks_;
std::vector<Worker> workers_;
bool finish_ = false;
std::atomic<bool> exc_ = false;
};
template <bool MultiThreaded = true, size_t BitSize>
void ProcessBitset(Pool & pool, std::bitset<BitSize> & bset,
std::string const & businessLogicCriteria) {
static size_t constexpr block = BitSize / 16;
for (int j = 0; j < BitSize; j += block) {
auto task = [&bset, j]{
int const hi = std::min(j + block, BitSize);
for (int i = j; i < hi; ++i) {
if (i % 2 == 0)
bset[i] = 0;
else
bset[i] = 1;
}
};
if constexpr(MultiThreaded)
pool.Emplace(std::move(task));
else
task();
}
if constexpr(MultiThreaded)
pool.Wait();
}
static auto const gtb = std::chrono::high_resolution_clock::now();
double Time() {
return std::chrono::duration_cast<std::chrono::duration<double>>(
std::chrono::high_resolution_clock::now() - gtb).count();
}
void Compute() {
Pool pool;
std::bitset<65536> bset;
std::string businessLogicCriteria;
int const hi = 50000;
for (int j = 0; j < hi; ++j) {
if ((j & 0x1FFF) == 0 || j + 1 >= hi)
std::cout << j / 1000 << "K (" << std::fixed << std::setprecision(3) << Time() << " sec), " << std::flush;
ProcessBitset(pool, bset, businessLogicCriteria);
businessLogicCriteria = "...";
}
}
void TimeMeasure() {
size_t constexpr A = 1 << 16, B = 1 << 5;
{
Pool pool;
auto const tb = Time();
int64_t volatile x = 0;
for (size_t i = 0; i < A; ++i) {
for (size_t j = 0; j < B; ++j)
pool.Emplace([&]{ x = x + 1; });
pool.Wait();
}
std::cout << "AtomicPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
<< " sec, speed " << A * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
<< 1'000'000 / (A * B / (Time() - tb)) << " sec/M-task, no-collisions "
<< std::setprecision(7) << double(x) / (A * B) << std::endl;
}
{
auto const tb = Time();
//size_t const nthr = std::thread::hardware_concurrency();
size_t constexpr C = A / 8;
std::vector<std::future<void>> asyncs;
int64_t volatile x = 0;
for (size_t i = 0; i < C; ++i) {
asyncs.clear();
for (size_t j = 0; j < B; ++j)
asyncs.emplace_back(std::async(std::launch::async, [&]{ x = x + 1; }));
asyncs.clear();
}
std::cout << "AsyncPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
<< " sec, speed " << C * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
<< 1'000'000 / (C * B / (Time() - tb)) << " sec/M-task, no-collisions "
<< std::setprecision(7) << double(x) / (C * B) << std::endl;
}
}
int main() {
try {
TimeMeasure();
Compute();
return 0;
} catch (std::exception const & ex) {
std::cout << "Exception: " << ex.what() << std::endl;
return -1;
} catch (...) {
std::cout << "Unknown Exception!" << std::endl;
return -1;
}
}
4 核(8 个硬件线程)的输出:
Pool has 7 worker threads.
AtomicPool time 0.903 sec, speed 2321.831 empty K-tasks/sec, 0.431 sec/M-task, no-collisions 0.9999967
AsyncPool time 0.982 sec, speed 266.789 empty K-tasks/sec, 3.750 sec/M-task, no-collisions 0.9999123
Pool has 7 worker threads.
0K (0.074 sec), 8K (0.670 sec), 16K (1.257 sec), 24K (1.852 sec), 32K (2.435 sec), 40K (2.984 sec), 49K (3.650 sec), 49K (3.711 sec),
下面的比较是 single-threaded 版本时间,即慢 6x
倍:
0K (0.125 sec), 8K (3.786 sec), 16K (7.754 sec), 24K (11.202 sec), 32K (14.662 sec), 40K (18.056 sec), 49K (21.470 sec), 49K (21.841 sec),
您有想要并行化的内部循环:
for (size_t j = 0; j < bits.size(); ++j)
bits[j] = ModifyBit(i, j, hash, bits[j]);
所以一个好主意是将它分成块,让多个线程并行处理每个块。您可以使用 std::atomic<int>
计数器轻松地将块提交给工作人员,该计数器递增以识别要处理的块。您还可以确保线程在一个循环后全部停止工作,然后再使用 std::barrier
:
std::bitset<65536> bits{};
std::thread pool[8]; // Change size accordingly
std::atomic<int> task_number{0};
constexpr std::size_t tasks_per_loop = 32; // Arbitrarily chosen
constexpr std::size_t block_size = (bits.size()+tasks_per_loop-1) / tasks_per_loop;
// (only written to by one thread by the barrier, so not atomic)
uint64_t hash = 0;
int i = 0;
std::barrier barrier(std::size(pool), [&]() {
task_number = 0;
++i;
hash = Hash(bits, hash);
});
for (std::thread& t : pool) {
t = std::thread([&]{
while (i < 50000) {
for (int t; (t = task_number++) < tasks_per_loop;) {
int block_start = t * block_size;
int block_end = std::min(block_start + block_size, bits.size());
for (int j = block_start; j < block_end; ++j) {
bits[j] = ModifyBit(i, j, hash, bits[j]);
}
}
// Wait for other threads to finish and hash
// to be calculated before starting next loop
barrier.arrive_and_wait();
}
});
}
for (std::thread& t : pool) t.join();
(使用 OpenMP #pragma omp parallel for
并行化 for 循环的看似简单的方法在一些测试中似乎较慢,可能是因为任务太小了)
这里是针对你的实现运行类似的代码:https://godbolt.org/z/en76Kv4nn
在我的机器上,运行 这几次 100 万次迭代用我的方法花费了 28 到 32 秒,而用你的一般线程池方法花费了 44 到 50 秒(当然这是不那么普遍的因为它不能执行任意 std::function<void()>
任务。