如何将递归函数的线程与子线程同步

How to synchronize thread for recursive function with sub-threads

我是 C++ 和线程的新手,我被这个问题困了好几天。它应该构成 fft(快速傅里叶变换)的基本代码——只是一个基本代码,所以有几件事是仍然缺少旋转项,输入是双数(还不是复数)。

我想用 C++ 对函数 f_thread 进行一些并行编程...这是一个有效的 'compilable' 代码

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>

void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}

void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}

void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}

//the f_thread function

void f_thread(std::vector<double> in, std::vector<double> &out) {

    if (in.size() == 1) {out = in;}
    else {

        std::vector<double> f0(in.size()/2);
        std::vector<double> f1(in.size()/2);

        get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
        get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in

        std::vector<double> a(f0.size());
        std::vector<double> b(f1.size());

        std::mutex mtx1; std::mutex mtx2;

        std::thread t0(f_thread,std::ref(f0),std::ref(a)); //create thread for f_thread on a
        std::thread t1(f_thread,std::ref(f1),std::ref(b)); //create thread for f_thread on b

        t0.join(); t1.join(); // join 2 threads

        std::vector<double> a_out(f0.size());
        std::vector<double> b_out(f1.size());

        add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
        sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a

        std::vector<double> f_out(in.size());
        attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
        out = f_out; 
    }
}

int main() {
    int n_elements = 16;
    std::vector<double> sample_input(n_elements);
    for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
    std::vector<double> output(n_elements);
    std::thread start(f_thread,std::ref(sample_input),std::ref(output));
    start.join();
    for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "\n";}
    }

所以f_thread初始化为一个线程然后创建2个子线程递归调用f_thread.我尝试了几种使用互斥体的技巧,但 none 似乎有效,因为 2 个子线程之间的同步进展不顺利(这是竞争条件的热点)。这是我试过但没有用的一段代码。我也尝试过使用全局递归互斥锁,但仍然没有任何改进。

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>

void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}

void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}

void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}

//the f_thread function

void f_thread(std::vector<double> in, std::vector<double> &out) {

    if (in.size() == 1) {out = in;}
    else {

        std::vector<double> f0(in.size()/2);
        std::vector<double> f1(in.size()/2);

        get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
        get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in

        std::vector<double> a(f0.size());
        std::vector<double> b(f1.size());

        std::mutex mtx1; std::mutex mtx2;

        mtx1.lock(); std::thread t0(f_thread,std::ref(f0),std::ref(a)); mtx1.unlock(); //create thread for f_thread on a
        mtx2.lock(); std::thread t1(f_thread,std::ref(f1),std::ref(b)); mtx2.unlock(); //create thread for f_thread on b

        t0.join(); t1.join(); // join 2 threads

        std::vector<double> a_out(f0.size());
        std::vector<double> b_out(f1.size());

        add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
        sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a

        std::vector<double> f_out(in.size());
        attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
        out = f_out; 
    }
}

int main() {
    int n_elements = 16;
    std::vector<double> sample_input(n_elements);
    for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
    std::vector<double> output(n_elements);
    std::thread start(f_thread,std::ref(sample_input),std::ref(output));
    start.join();
    for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "\n";}
    }

我必须验证此代码是否使用 g++ f_thread.cpp -pthread 和标准 C++ 库在 linux (ubuntu 18.04) OS[=14= 中编译]

现在的代码 运行s(不再是 'aborted core dumped errors'),但是线程版本的输出在每次 运行 时都会发生变化(表明同步效果不佳)。

供参考,这里是代码的顺序版本,它不使用子线程并且运行良好(即每次输出都没有变化运行)

// WORKING sequential version

#include<iostream>
#include<thread>
#include <vector>
#include <mutex>

void get_odd_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 0; i < inpt.size()-1; i = i + 2) {out[i/2] = inpt[i];}
}

void get_even_elements(std::vector<double> inpt, std::vector<double> &out) {
    for (int i = 1; i < inpt.size(); i = i + 2) {out[i/2] = inpt[i];}
}

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

void add_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = x[i] + y[i];}}

void sub_vectors(std::vector<double> &x, std::vector<double> &y, std::vector<double> &z) {for (int i = 0; i < x.size(); i++) {z[i] = y[i] - x[i];}}

//the f_thread function

void f_thread(std::vector<double> in, std::vector<double> &out) {

    if (in.size() == 1) {out = in;}
    else {

        std::vector<double> f0(in.size()/2);
        std::vector<double> f1(in.size()/2);

        get_odd_elements(in,std::ref(f0)); //get_odd_elements is a function that gets all odd-indexed elements of f
        get_even_elements(in,std::ref(f1)); //get_even_elements is a function that gets all even-indexed elements of in

        std::vector<double> a(f0.size());
        std::vector<double> b(f1.size());

        f_thread(std::ref(f0),std::ref(a)); // no thread, just call recursion 

        f_thread(std::ref(f1),std::ref(b)); // no thread, just call recursion 

        std::vector<double> a_out(f0.size());
        std::vector<double> b_out(f1.size());

        add_vectors(std::ref(a),std::ref(b),std::ref(a_out)); //call add_vectors function : a + b
        sub_vectors(std::ref(a),std::ref(b),std::ref(b_out)); //call sub_vectors function : b - a

        std::vector<double> f_out(in.size());
        attach(a_out,b_out,std::ref(f_out)); //attach is a function that appends b to the end of a
        out = f_out; 
    }
}

int main() {
    int n_elements = 16;
    std::vector<double> sample_input(n_elements);
    for (int i = 0; i < n_elements; i++) {sample_input[i] = i;}
    std::vector<double> output(n_elements);
    std::thread start(f_thread,std::ref(sample_input),std::ref(output));
    start.join();
    for (int i = 0; i < n_elements; i++) {std::cout << "output element "; std::cout << i; std::cout << ": "; std::cout << output[i]; std::cout<< "\n";}
    }

每次代码 运行.

时,结果都应该固定为此输出
output element 0: 120
output element 1: 0
output element 2: 0
output element 3: 7.31217e-322
output element 4: 0
output element 5: 6.46188e-319
output element 6: 56
output element 7: 0
output element 8: 0
output element 9: 4.19956e-322
output element 10: 120
output element 11: 0
output element 12: 0
output element 13: 7.31217e-322
output element 14: 0
output element 15: 6.46188e-319

这不是线程错误,而是越界访问函数中的数组元素 attach:

void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    for (int i = 0; i < a.size(); i++) {out[i] = a[i];}
    for (int i = a.size(); i < a.size()+b.size(); i++) {out[i] = b[i];}
}

在第二个循环中,索引从 a.size() 开始,而不是从 0 开始 - 但您使用它来访问 b 的元素,就像它从 0 开始一样。

您可以使用 <algorithm>:

中的 std::copy 而不是编写循环
void attach(std::vector<double> a, std::vector<double> b, std::vector<double> &out) {
    std::copy(a.begin(), a.end(), out.begin());
    std::copy(b.begin(), b.end(), out.begin()+a.size());
}

之后,对于递归线程你只需要这个:

std::thread t0(f_thread,std::ref(f0),std::ref(a)); //create thread for f_thread on a
std::thread t1(f_thread,std::ref(f1),std::ref(b)); //create thread for f_thread on b
t0.join(); t1.join(); // join 2 threads

没有竞争,因为每个线程使用单独的输入和输出数组(您在 "parent" 线程的堆栈上创建)。结果是确定的,对于顺序和线程版本是相同的:

output element 0: 120
output element 1: 64
output element 2: 32
output element 3: 0
output element 4: 16
output element 5: 0
output element 6: 0
output element 7: 0
output element 8: 8
output element 9: 0
output element 10: 0
output element 11: 0
output element 12: 0
output element 13: 0
output element 14: 0
output element 15: 0

顺便说一句,你可能已经猜到连你的串行版本都不正确,因为输入的数据都是整数,你只复制,加减;所以没有理由像 7.31217e-322 这样的浮点数出现在输出中。

另请注意 Davis Herring 的评论:您在向量之间复制了很多数据。至少,我会通过 const 引用而不是通过值将向量传递给函数(除非知道这些副本已被删除)。

最后,您应该比输入数组大小为 1 时更早地停止创建新线程。对于实际问题大小,您可能无法创建数千个线程;即使您成功了,创建和 运行 这么多线程的开销也会使您的代码 运行 非常非常慢。理想情况下,您创建的线程不应多于代码 运行s.

所在机器上的硬件内核数

您应该通过询问有多少 cpu 来处理这个问题,然后将您的工作拆分并使用队列将其重新组合在一起。

我不知道 FFT 算法,但通过粗略地查看您的代码,您似乎基本上使用越来越细的齿梳将数据拆分。除非你从最好的水平开始,然后逐步提高,否则这不是一个很好的拆分方式。

您不希望不同的 CPU 处理每个其他值,因为即使在单芯片多核 CPU 上,也有多个 L1 缓存。每个 L1 缓存最多与一个其他核心共享。因此,您希望单个 CPU 处理的所有值彼此接近,以最大限度地提高您要查找的值在缓存中的机会。

因此,您应该从最大的连续块开始拆分。因为 FFT 算法基于 2 的幂运算,所以您应该计算您拥有的核心数。使用thread::hardware_concurrency()来计数。然后四舍五入到下一个最高的 2 次方并将您的问题拆分为该数量的子 FFT。然后在主线程中合并他们的结果。

我有一个我编写的程序,可以满足您的需求。它splits up a list into a number of chunks to run sort on。然后它有一个需要完成的合并队列。每个块都由一个单独的线程处理,每个合并也产生到它自己的线程中。

我将核心数一分为二,因为我不喜欢现代 CPU 的一个功能,称为超线程。我本可以忽略它,它会 运行 很好,但由于主要争论是关于整数 ALU,它可能会慢一点。 (超线程在单个内核中共享资源。)

从其他答案看来,您的 FFT 代码似乎有一些错误。我建议让它只使用一个线程,然后弄清楚如何拆分它。