并行快速处理 std::bitset<65536>

Fast process std::bitset<65536> in parallel

曾经有一个被删除的question,我写了一个巨大的答案,但是这个问题被删除了,作者拒绝取消删除。

所以在这里发布这个问题的简短摘要。并立即亲自回答这个问题,只是为了分享我的结果。

问题是,如果我们给定 std::bitset<65536> 在内循环中逐位处理(通过某些公式),那么我们如何提高计算量?

外层循环只是调用了内层循环很多次(比如说50 000次),外层循环不能并行处理,因为每次下一次迭代都取决于上一次迭代的结果。

这个过程的示例代码:

std::bitset<65536> bits{};
uint64_t hash = 0;
for (size_t i = 0; i < 50000; ++i) {
    // Process Bits
    for (size_t j = 0; j < bits.size(); ++j)
        bits[j] = ModifyBit(i, j, hash, bits[j]);
    hash = Hash(bits, hash);
}

以上代码只是一种示例处理方式,并非真实案例。实际情况是这样的,很多时候我们以某种方式处理 std::bitset<65536>,所有位都可以独立处理。

问题是我们如何在内循环中尽可能快地并行处理位。

一个重要的注意修改位的公式是通用的,这意味着我们事先不知道它并且不能从中做出SIMD指令.

但我们知道所有位都可以独立处理。而且我们需要并行处理这个过程。我们也不能并行化外部循环,因为它的每次迭代都取决于前一次迭代的结果。

另一个注意是std::bitset<65536>很小,只有1K的64位字。因此,这意味着直接使用 std::thread of std::async 线程的池将不起作用,因为每个线程的工作将只有 50-200 纳秒左右,启动和停止线程以及将工作发送到的时间非常短他们。即使 std::mutex 在我的 Windows 机器上也需要 75 纳秒(尽管在 Linux 上需要 20 纳秒),所以使用 std::mutex 也是一个很大的开销。

人们可能会假设上面的 ModifyBit() 函数对每个位花费大约相同的时间,否则就无法理解如何安排循环的平衡并行化,只能将其切成许多小任务,希望较长的任务将被几个较短的任务所平衡。

为您的任务实施了相当庞大和复杂的解决方案,但运行速度非常快。在我的 4 核(8 个硬件线程)笔记本电脑上,与单线程版本(您的代码版本)相比,我有 6x 倍 multi-core 的加速。

以下解决方案的主要思想是为 运行ning 具有小开销的任意任务实现非常快速的多核 Thread-Pool。我的实现每秒最多可以处理 1-1000 万个任务(取决于 CPU 速度和内核数)。

异步启动多个任务的常规方法是使用 std::async or just by creating std::thread。这两种方式都比我自己的实现慢得多。他们不能像我的实现那样提供每秒 500 万个任务的吞吐量。并且您的代码需要每秒执行数百万个任务才能达到 运行 以获得良好的速度。这就是为什么我从头开始实施所有内容的原因。

现在实现快速线程池后,我们可以将您的 64K 位集分割成更小的 sub-sets 并并行处理这些 sub-sets。我把64K的bitset分成了16等份(见代码中的BitSize / 16),你可以设置其他的份量等于2的幂,但不要太多,否则线程池开销会太大。通常最好分割成等于硬件线程数量两倍(或内核数量的 4 倍)的部分数量。

我用 C++ 代码实现了几个 classes。 AtomicMutex class 使用 std::atomic_flag 来快速替换基于 spin-locking 方法的互斥体。此 AtomicMutex 用于保护为线程池上的 运行ning 提交的任务队列。

RingBuffer class基于std::vector实现了简单快速的队列来存储任何对象。它是使用两个指向向量的指针(头和尾)实现的。当新元素添加到队列时,尾指针向右前进,如果该指针到达向量的末尾,则它环绕到第 0 个位置。同样,当元素从队列中取出时,头指针也向右前进并环绕。 RingBuffer用于存放线程池任务

Queue class 是 RingBuffer 的包装器,但具有 AtomicMutex 保护。此 spin-lock 互斥体用于保护同步 adding/taking 元素 to/from 队列免受多个工作线程的影响。

Pool 实现 multi-core 任务池本身。它创建的工作线程数与 CPU 硬件线程数(内核数量的两倍)减去一个数一样多。每个工作线程只是从队列中轮询新任务并立即执行它们。主线程将新任务添加到队列中。 Pool 也有等待所有当前任务完成的 Wait() 能力,这个等待被用作等待整个 64K 位集被处理(所有 sub-parts 被处理)的屏障。 Pool 接受任何 lambda(函数闭包)为 运行。你可以看到 64K 位集被分割成更小的部分是通过 pool.Emplace(lambda) 处理的,后来 pool.Wait() 用于等待所有 sub-parts 完成。如果有任何错误,池工作人员的异常将被收集并报告给用户。在主线程内执行 Wait() 池 运行s 任务时,不要浪费一个核心来等待任务完成。

控制台中报告的时间由 std::chrono 模块完成。

可以 运行 两个版本 - single-threaded(您的原始版本)和 multi-threaded 使用所有内核。 single/multi 之间的切换是通过将 MultiThreaded = true 模板参数传递给函数 ProcessBitset().

来完成的

Try it online!

#include <cstdint>
#include <atomic>
#include <vector>
#include <array>
#include <queue>
#include <functional>
#include <thread>
#include <future>
#include <exception>
#include <optional>
#include <memory>
#include <iostream>
#include <iomanip>
#include <bitset>
#include <string>
#include <chrono>
#include <algorithm>
#include <any>
#include <type_traits>

class AtomicMutex {
    class LockerC;
public:
    void lock() {
        while (f_.test_and_set(std::memory_order_acquire))
            //f_.wait(true, std::memory_order_acquire)
        ;
    }
    void unlock() {
        f_.clear(std::memory_order_release);
        //f_.notify_all();
    }
    LockerC Locker() { return LockerC(*this); }
private:
    class LockerC {
    public:
        LockerC() = delete;
        LockerC(AtomicMutex & mux) : pmux_(&mux) { mux.lock(); }
        LockerC(LockerC const & other) = delete;
        LockerC(LockerC && other) : pmux_(other.pmux_) { other.pmux_ = nullptr; }
        ~LockerC() { if (pmux_) pmux_->unlock(); }
        LockerC & operator = (LockerC const & other) = delete;
        LockerC & operator = (LockerC && other) = delete;
    private:
        AtomicMutex * pmux_ = nullptr;
    };
    
    std::atomic_flag f_ = ATOMIC_FLAG_INIT;
};

template <typename T>
class RingBuffer {
public:
    RingBuffer() : buf_(1 << 8), last_(buf_.size() - 1) {}
    T & front() { return buf_[first_]; }
    T const & front() const { return buf_[first_]; }
    T & back() { return buf_[last_]; }
    T const & back() const { return buf_[last_]; }
    size_t size() const { return size_; }
    bool empty() const { return size_ == 0; }
    
    template <typename ... Args>
    void emplace(Args && ... args) {
        while (size_ >= buf_.size()) {
            std::rotate(&buf_[0], &buf_[first_], &buf_[buf_.size()]);
            first_ = 0;
            last_ = buf_.size() - 1;
            buf_.resize(buf_.size() * 2);
        }
        ++size_;
        ++last_;
        if (last_ >= buf_.size())
            last_ = 0;
        buf_[last_] = T(std::forward<Args>(args)...);
    }
    
    void pop() {
        if (size_ == 0)
            return;
        --size_;
        ++first_;
        if (first_ >= buf_.size())
            first_ = 0;
    }
private:
    std::vector<T> buf_;
    size_t first_ = 0, last_ = 0, size_ = 0;
};

template <typename T>
class Queue {
public:
    size_t Size() const { return q_.size(); }
    bool Empty() const { return q_.size() == 0; }
    
    template <typename ... Args>
    void Emplace(Args && ... args) {
        auto lock = m_.Locker();
        q_.emplace(std::forward<Args>(args)...);
    }
    
    T Pop(std::function<void()> const & on_empty = []{},
          std::function<void()> const & on_full  = []{}) {
        while (true) {
            if (q_.empty()) {
                on_empty();
                continue;
            }
            auto lock = m_.Locker();
            if (q_.empty()) {
                on_empty();
                continue;
            }
            on_full();
            T val = std::move(q_.front());
            q_.pop();
            return std::move(val);
        }
    }
    
    std::optional<T> TryPop() {
        auto lock = m_.Locker();
        if (q_.empty())
            return std::nullopt;
        T val = std::move(q_.front());
        q_.pop();
        return std::move(val);
    }
private:
    AtomicMutex m_;
    RingBuffer<T> q_;
};

class RunInDestr {
public:
    RunInDestr(std::function<void()> const & f) : f_(f) {}
    ~RunInDestr() { f_(); }
private:
    std::function<void()> const & f_;
};

class Pool {
public:
    struct FinishExc {};
    struct Worker {
        std::unique_ptr<std::atomic<bool>> pdone = std::make_unique<std::atomic<bool>>(true);
        std::unique_ptr<std::exception_ptr> pexc = std::make_unique<std::exception_ptr>();
        std::unique_ptr<std::thread> thr;
    };
    Pool(size_t nthreads = size_t(-1)) {
        if (nthreads == size_t(-1))
            nthreads = std::thread::hardware_concurrency() - 1;
        std::cout << "Pool has " << nthreads << " worker threads." << std::endl;
        for (size_t i = 0; i < nthreads; ++i) {
            workers_.emplace_back(Worker{});
            workers_.back().thr = std::make_unique<std::thread>(
                [&, pdone = workers_.back().pdone.get(), pexc = workers_.back().pexc.get()]{
                    try {
                        std::function<void()> f_done = [pdone]{
                            pdone->store(true, std::memory_order_relaxed);
                        }, f_empty = [this]{
                            CheckFinish();
                        }, f_full = [pdone]{
                            pdone->store(false, std::memory_order_relaxed);
                        };
                        while (true) {
                            RunInDestr set_done(f_done);
                            tasks_.Pop(f_empty, f_full)();
                        }
                    } catch (...) {
                        exc_.store(true, std::memory_order_relaxed);
                        *pexc = std::current_exception();
                    }
                });
        }
    }
    ~Pool() {
        Wait();
        Finish();
    }
    void CheckExc() {
        if (!exc_.load(std::memory_order_relaxed))
            return;
        Finish();
        throw std::runtime_error("Pool: Exception occured!");
    }
    void Finish() {
        finish_ = true;
        for (auto & w: workers_)
            try {
                w.thr->join();
                if (*w.pexc)
                    std::rethrow_exception(*w.pexc);
            } catch (FinishExc const &) {}
        workers_.clear();
    }
    template <typename ... Args>
    void Emplace(Args && ... args) {
        CheckExc();
        tasks_.Emplace(std::forward<Args>(args)...);
    }
    void Wait() {
        while (true) {
            auto task = tasks_.TryPop();
            if (!task)
                break;
            (*task)();
        }
        
        while (true) {
            bool done = true;
            for (auto & w: workers_)
                if (!w.pdone->load(std::memory_order_relaxed)) {
                    done = false;
                    break;
                }
            if (done)
                break;
        }
        
        CheckExc();
    }
private:
    void CheckFinish() {
        if (finish_)
            throw FinishExc{};
    }
    Queue<std::function<void()>> tasks_;
    std::vector<Worker> workers_;
    bool finish_ = false;
    std::atomic<bool> exc_ = false;
};

template <bool MultiThreaded = true, size_t BitSize>
void ProcessBitset(Pool & pool, std::bitset<BitSize> & bset,
        std::string const & businessLogicCriteria) {
    static size_t constexpr block = BitSize / 16;
    for (int j = 0; j < BitSize; j += block) {
        auto task = [&bset, j]{
            int const hi = std::min(j + block, BitSize);
            for (int i = j; i < hi; ++i) {
                if (i % 2 == 0)
                    bset[i] = 0;
                else
                    bset[i] = 1;
            }
        };
        if constexpr(MultiThreaded)
            pool.Emplace(std::move(task));
        else
            task();
    }
    if constexpr(MultiThreaded)
        pool.Wait();
}

static auto const gtb = std::chrono::high_resolution_clock::now();

double Time() {
    return std::chrono::duration_cast<std::chrono::duration<double>>(
        std::chrono::high_resolution_clock::now() - gtb).count();
}

void Compute() {
    Pool pool;
    
    std::bitset<65536> bset;
    std::string businessLogicCriteria;
    
    int const hi = 50000;
    for (int j = 0; j < hi; ++j) {
        if ((j & 0x1FFF) == 0 || j + 1 >= hi)
            std::cout << j / 1000 << "K (" << std::fixed << std::setprecision(3) << Time() << " sec), " << std::flush;
        ProcessBitset(pool, bset, businessLogicCriteria);
        businessLogicCriteria = "...";
    }
}

void TimeMeasure() {
    size_t constexpr A = 1 << 16, B = 1 << 5;
    {
        Pool pool;
        auto const tb = Time();
        int64_t volatile x = 0;
        for (size_t i = 0; i < A; ++i) {
            for (size_t j = 0; j < B; ++j)
                pool.Emplace([&]{ x = x + 1; });
            pool.Wait();
        }
        std::cout << "AtomicPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
            << " sec, speed " << A * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
            << 1'000'000 / (A * B / (Time() - tb)) << " sec/M-task, no-collisions "
            << std::setprecision(7) << double(x) / (A * B) << std::endl;
    }
    
    {
        auto const tb = Time();
        //size_t const nthr = std::thread::hardware_concurrency();
        size_t constexpr C = A / 8;
        std::vector<std::future<void>> asyncs;
        int64_t volatile x = 0;
        for (size_t i = 0; i < C; ++i) {
            asyncs.clear();
            for (size_t j = 0; j < B; ++j)
                asyncs.emplace_back(std::async(std::launch::async, [&]{ x = x + 1; }));
            asyncs.clear();
        }
        std::cout << "AsyncPool time " << std::fixed << std::setprecision(3) << (Time() - tb)
            << " sec, speed " << C * B / (Time() - tb) / 1000.0 << " empty K-tasks/sec, "
            << 1'000'000 / (C * B / (Time() - tb)) << " sec/M-task, no-collisions "
            << std::setprecision(7) << double(x) / (C * B) << std::endl;
    }
}

int main() {
    try {
        TimeMeasure();
        Compute();
        return 0;
    } catch (std::exception const & ex) {
        std::cout << "Exception: " << ex.what() << std::endl;
        return -1;
    } catch (...) {
        std::cout << "Unknown Exception!" << std::endl;
        return -1;
    }
}

4 核(8 个硬件线程)的输出:

Pool has 7 worker threads.
AtomicPool time 0.903 sec, speed 2321.831 empty K-tasks/sec, 0.431 sec/M-task, no-collisions 0.9999967
AsyncPool time 0.982 sec, speed 266.789 empty K-tasks/sec, 3.750 sec/M-task, no-collisions 0.9999123
Pool has 7 worker threads.
0K (0.074 sec), 8K (0.670 sec), 16K (1.257 sec), 24K (1.852 sec), 32K (2.435 sec), 40K (2.984 sec), 49K (3.650 sec), 49K (3.711 sec),

下面的比较是 single-threaded 版本时间,即慢 6x 倍:

0K (0.125 sec), 8K (3.786 sec), 16K (7.754 sec), 24K (11.202 sec), 32K (14.662 sec), 40K (18.056 sec), 49K (21.470 sec), 49K (21.841 sec),

您有想要并行化的内部循环:

for (size_t j = 0; j < bits.size(); ++j)
    bits[j] = ModifyBit(i, j, hash, bits[j]);

所以一个好主意是将它分成块,让多个线程并行处理每个块。您可以使用 std::atomic<int> 计数器轻松地将块提交给工作人员,该计数器递增以识别要处理的块。您还可以确保线程在一个循环后全部停止工作,然后再使用 std::barrier:

开始下一个循环
std::bitset<65536> bits{};

std::thread pool[8];  // Change size accordingly
std::atomic<int> task_number{0};
constexpr std::size_t tasks_per_loop = 32;  // Arbitrarily chosen
constexpr std::size_t block_size = (bits.size()+tasks_per_loop-1) / tasks_per_loop;

// (only written to by one thread by the barrier, so not atomic)
uint64_t hash = 0;
int i = 0;

std::barrier barrier(std::size(pool), [&]() {
    task_number = 0;
    ++i;
    hash = Hash(bits, hash);
});

for (std::thread& t : pool) {
    t = std::thread([&]{
        while (i < 50000) {
            for (int t; (t = task_number++) < tasks_per_loop;) {
                int block_start = t * block_size;
                int block_end = std::min(block_start + block_size, bits.size());
                for (int j = block_start; j < block_end; ++j) {
                    bits[j] = ModifyBit(i, j, hash, bits[j]);
                }
            }

            // Wait for other threads to finish and hash
            // to be calculated before starting next loop
            barrier.arrive_and_wait();
        }
    });
}

for (std::thread& t : pool) t.join();

(使用 OpenMP #pragma omp parallel for 并行化 for 循环的看似简单的方法在一些测试中似乎较慢,可能是因为任务太小了)

这里是针对你的实现运行类似的代码:https://godbolt.org/z/en76Kv4nn

在我的机器上,运行 这几次 100 万次迭代用我的方法花费了 28 到 32 秒,而用你的一般线程池方法花费了 44 到 50 秒(当然这是不那么普遍的因为它不能执行任意 std::function<void()> 任务。