Mandelbrot集的多线程计算

Multithreading computation of Mandelbrot set

我创建了一个创建 Mandelbrot 集合的程序。现在我正在尝试让它成为多线程的。

// mandelbrot.cpp
// compile with: g++ -std=c++11 mandelbrot.cpp -o mandelbrot
// view output with: eog mandelbrot.ppm

#include <fstream>
#include <complex> // if you make use of complex number facilities in C++
#include <iostream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <vector>


using namespace std;

template <class T> struct RGB { T r, g, b; };

template <class T>
class Matrix {
public:
Matrix(const size_t rows, const size_t cols) : _rows(rows), _cols(cols) {
    _matrix = new T*[rows];
    for (size_t i = 0; i < rows; ++i) {
        _matrix[i] = new T[cols];
    }
}
Matrix(const Matrix &m) : _rows(m._rows), _cols(m._cols) {
    _matrix = new T*[m._rows];
    for (size_t i = 0; i < m._rows; ++i) {
        _matrix[i] = new T[m._cols];
        for (size_t j = 0; j < m._cols; ++j) {
            _matrix[i][j] = m._matrix[i][j];
        }
    }
}
~Matrix() {
    for (size_t i = 0; i < _rows; ++i) {
        delete [] _matrix[i];
    }
    delete [] _matrix;
}
T *operator[] (const size_t nIndex)
{
    return _matrix[nIndex];
}
size_t width() const { return _cols; }
size_t height() const { return _rows; }
protected:
size_t _rows, _cols;
T **_matrix;
};

// Portable PixMap image
class PPMImage : public Matrix<RGB<unsigned char> >
{
public:
   unsigned int size; 

PPMImage(const size_t height, const size_t width) : Matrix(height, width) { }
void save(const std::string &filename)
{
    std::ofstream out(filename, std::ios_base::binary);
    out <<"P6" << std::endl << _cols << " " << _rows << std::endl << 255 << std::endl;
    for (size_t y=0; y<_rows; y++)
        for (size_t x=0; x<_cols; x++) 
            out << _matrix[y][x].r << _matrix[y][x].g << _matrix[y][x].b;
}    
};

/*Draw mandelbrot according to the provided parameters*/
void draw_Mandelbrot(PPMImage & image, const unsigned width, const unsigned height, double cxmin, double cxmax, double cymin, double cymax,unsigned int max_iterations)                         
{

for (std::size_t ix = 0; ix < width; ++ix)
    for (std::size_t iy = 0; iy < height; ++iy)
    {
        std::complex<double> c(cxmin + ix / (width - 1.0)*(cxmax - cxmin), cymin + iy / (height - 1.0)*(cymax - cymin));
        std::complex<double> z = 0;
        unsigned int iterations;

        for (iterations = 0; iterations < max_iterations && std::abs(z) < 2.0; ++iterations)
            z = z*z + c;

        image[iy][ix].r = image[iy][ix].g = image[iy][ix].b = iterations;

    }
}

int main()
{
const unsigned width = 1600;
const unsigned height = 1600;

PPMImage image(height, width);


int parts = 8;

std::vector<int>bnd (parts, image.size);

std::thread *tt = new std::thread[parts - 1];

time_t start, end;
time(&start);
//Lauch parts-1 threads
for (int i = 0; i < parts - 1; ++i) {
    tt[i] = std::thread(draw_Mandelbrot,ref(image), width, height, -2.0, 0.5, -1.0, 1.0, 10);
}

//Use the main thread to do part of the work !!!
for (int i = parts - 1; i < parts; ++i) {
    draw_Mandelbrot(ref(image), width, height, -2.0, 0.5, -1.0, 1.0, 10);
}

//Join parts-1 threads
for (int i = 0; i < parts - 1; ++i)
    tt[i].join();

time(&end);
std::cout << difftime(end, start) << " seconds" << std::endl;


image.save("mandelbrot.ppm");

delete[] tt;

return 0;
}

现在每个thread绘制完整的分形(查看main())。如何让线程绘制分形的不同部分?

这种方法的一个大问题是不同的区域需要不同的时间来计算。

更通用的方法是。

  • 启动 1 个源线程。
  • 启动N个工作线程。
  • 启动 1 个接收器线程。
  • 创建 2 个线程安全队列(将它们称为源队列和接收器队列)。
  • 将图像分成M(多于N)个部分。
  • 源线程将片段推入源队列
  • 工作人员从源队列中拉取片段,将片段转换为结果片段,并将这些片段推送到接收器队列中。
  • 接收器线程从接收器队列中取出片段并将它们组合成最终图像。

这样分工,所有的工作线程都会一直很忙。

你让这个(相当多)比它需要的更难。这是 OpenMP 几乎完全适合的任务。对于此任务,它以 bare 最小的努力提供了近乎完美的缩放。

我通过在外部 for 循环之前插入编译指示修改了您的 draw_mandelbrot

#pragma omp parallel for
for (int ix = 0; ix < width; ++ix)
    for (int iy = 0; iy < height; ++iy)

然后我将你的 main 简化为:

int main() {
    const unsigned width = 1600;
    const unsigned height = 1600;

    PPMImage image(height, width);

    clock_t start = clock();
    draw_Mandelbrot(image, width, height, -2.0, 0.5, -1.0, 1.0, 10);
    clock_t stop = clock();

    std::cout << (double(stop - start) / CLOCKS_PER_SEC) << " seconds\n";

    image.save("mandelbrot.ppm");

    return 0;
}

在我的(相当慢的)机器上,您的原始代码 运行 在 4.73 秒内完成。我在 1.38 秒内修改代码 运行。这是代码量的 3.4 倍改进,与普通的单线程版本几乎没有区别。

为了它的价值,我做了更多的重写来得到这个:

// mandelbrot.cpp
// compile with: g++ -std=c++11 mandelbrot.cpp -o mandelbrot
// view output with: eog mandelbrot.ppm

#include <fstream>
#include <complex> // if you make use of complex number facilities in C++
#include <iostream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <vector>

using namespace std;

template <class T> struct RGB { T r, g, b; };

template <class T>
struct Matrix
{
    std::vector<T> data;
    size_t rows;
    size_t cols;

    class proxy {
        Matrix &m;
        size_t index_1;
    public:
        proxy(Matrix &m, size_t index_1) : m(m), index_1(index_1) { }

        T &operator[](size_t index) { return m.data[index * m.rows + index_1]; }
    };

    class const_proxy {
        Matrix const &m;
        size_t index_1;
    public:
        const_proxy(Matrix const &m, size_t index_1) : m(m), index_1(index_1) { }

        T const &operator[](size_t index) const { return m.data[index * m.rows + index_1]; }
    };


public:
    Matrix(size_t rows, size_t cols) : data(rows * cols), rows(rows), cols(cols) { }

    proxy operator[](size_t index) { return proxy(*this, index); }
    const_proxy operator[](size_t index) const { return const_proxy(*this, index); }

};

template <class T>
std::ostream &operator<<(std::ostream &out, Matrix<T> const &m) {
    out << "P6" << std::endl << m.cols << " " << m.rows << std::endl << 255 << std::endl;
    for (size_t y = 0; y < m.rows; y++)
        for (size_t x = 0; x < m.cols; x++) {
            T pixel = m[y][x];
            out << pixel.r << pixel.g << pixel.b;
        }
    return out;
}

/*Draw Mandelbrot according to the provided parameters*/
template <class T>
void draw_Mandelbrot(T & image, const unsigned width, const unsigned height, double cxmin, double cxmax, double cymin, double cymax, unsigned int max_iterations) {

#pragma omp parallel for
    for (int ix = 0; ix < width; ++ix)
        for (int iy = 0; iy < height; ++iy)
        {
            std::complex<double> c(cxmin + ix / (width - 1.0)*(cxmax - cxmin), cymin + iy / (height - 1.0)*(cymax - cymin));
            std::complex<double> z = 0;
            unsigned int iterations;

            for (iterations = 0; iterations < max_iterations && std::abs(z) < 2.0; ++iterations)
                z = z*z + c;

            image[iy][ix].r = image[iy][ix].g = image[iy][ix].b = iterations;

        }
}

int main() {
    const unsigned width = 1600;
    const unsigned height = 1600;

    Matrix<RGB<unsigned char>> image(height, width);

    clock_t start = clock();
    draw_Mandelbrot(image, width, height, -2.0, 0.5, -1.0, 1.0, 255);
    clock_t stop = clock();

    std::cout << (double(stop - start) / CLOCKS_PER_SEC) << " seconds\n";

    std::ofstream out("mandelbrot.ppm", std::ios::binary);
    out << image;

    return 0;
}

在我的机器上,这段代码运行大约需要 0.5 到 0.6 秒。

至于为什么我做了这些改变:主要是为了让它更快、更干净、更简单。您的 Matrix class 为每一行(或者可能是列——没有特别注意)分配了一个单独的内存块。这会分配整个矩阵的一个连续块。这消除了获取数据的间接级别,并增加了引用的位置,从而提高了缓存使用率。它还减少了使用的数据总量。

从使用 time 更改为使用 clock 进行计时是为了测量 CPU 时间而不是墙上时间(通常也会显着提高精度)。

摆脱 PPMImage class 只是因为 (IMO) 有一个从 Matrix class 派生的 PPImage class 并没有多大用处(如果有的话)感觉。我想它可以工作(对于 "work" 的足够宽松的定义),但它并没有给我留下好的设计印象。如果你坚持这样做,它至少应该是私有推导,因为你只是使用 Matrix 作为实现 PPMImage class 的一种方式,而不是(至少我当然希望不是)试图关于 PPM 图像属性的断言。

如果出于某种原因,您决定手动处理线程,那么在线程之间划分工作的明显方法仍然是查看 draw_mandelbrot 内的循环。最明显的方法是不理会你的外循环,而是将每次迭代的计算发送到线程池: for (int ix = 0; ix < width; ++ix) compute_thread(九);

compute_thread 的主体基本上是这段代码:

        for (int iy = 0; iy < height; ++iy)
        {
            std::complex<double> c(cxmin + ix / (width - 1.0)*(cxmax - cxmin), cymin + iy / (height - 1.0)*(cymax - cymin));
            std::complex<double> z = 0;
            unsigned int iterations;

            for (iterations = 0; iterations < max_iterations && std::abs(z) < 2.0; ++iterations)
                z = z*z + c;

            image[iy][ix].r = image[iy][ix].g = image[iy][ix].b = iterations;

        }

将正确的数据传递给计算线程显然会涉及一些工作(每个线程都应该传递对结果图片的一部分的引用),但这是一个明显且相当干净的地方分东西。特别是它将工作分成足够多的任务,你可以半自动地获得很好的负载平衡(即,你可以让所有的核心保持忙碌)但又足够大,你不会在通信和同步上浪费大量时间线程。

关于结果,迭代次数设置为 255,我得到以下结果(缩放到 25%):

...这和我预期的差不多。

您可以通过将分形的开始和结束与屏幕尺寸分开来将分形分成几块:

   $this->stepsRe = (double)((($this->startRe * -1) + ($this->endeRe)) / ($this->size_x-1));
   $this->stepsIm = (double)((($this->startIm * -1) + ($this->endeIm)) / ($this->size_y-1));