C++分而治之矩阵乘法算法太慢

C++ Divide and conquer matrix multiplication algorithm too slow

我想请你帮忙。我在 C++ 中实现了分而治之算法。它工作正常,但它真的很慢。 512x512 矩阵的计算时间为 90 秒,与使用朴素算法的 0.4 秒相比,这是不可接受的。你能告诉我,如果,我做错了什么吗?

主要的分治功能是:

std::vector<std::vector<int>> Matrix::divideAndConquer(std::vector<std::vector<int>>& matrix1, 
std::vector<std::vector<int>>& matrix2)
{
if (matrix1.size() == 1) {
    return naive(matrix1, matrix2);
}

int newSize = matrix1.size() / 2;
std::vector<int> inner(newSize);
std::vector<std::vector<int>> a11(newSize, inner);
std::vector<std::vector<int>> a12(newSize, inner);
std::vector<std::vector<int>> a21(newSize, inner);
std::vector<std::vector<int>> a22(newSize, inner);

std::vector<std::vector<int>> b11(newSize, inner);
std::vector<std::vector<int>> b12(newSize, inner);
std::vector<std::vector<int>> b21(newSize, inner);
std::vector<std::vector<int>> b22(newSize, inner);

Matrix::divideMatrix(matrix1, a11, a12, a21, a22);
Matrix::divideMatrix(matrix2, b11, b12, b21, b22);

std::vector<std::vector<int>> m1 = divideAndConquer(a11, b11);
std::vector<std::vector<int>> m2 = divideAndConquer(a12, b21);
std::vector<std::vector<int>> m3 = divideAndConquer(a11, b12);
std::vector<std::vector<int>> m4 = divideAndConquer(a12, b22);

std::vector<std::vector<int>> m5 = divideAndConquer(a21, b11);
std::vector<std::vector<int>> m6 = divideAndConquer(a22, b21);
std::vector<std::vector<int>> m7 = divideAndConquer(a21, b12);
std::vector<std::vector<int>> m8 = divideAndConquer(a22, b22);

std::vector<std::vector<int>> c1 = add(m1, m2);
std::vector<std::vector<int>> c2 = add(m3, m4);
std::vector<std::vector<int>> c3 = add(m5, m6);
std::vector<std::vector<int>> c4 = add(m7, m8);

return combine(c1, c2, c3, c4);
}

然后划分矩阵函数为:

void Matrix::divideMatrix(std::vector<std::vector<int>>& matrix, std::vector<std::vector<int>>& m1, 
std::vector<std::vector<int>>& m2, std::vector<std::vector<int>>& m3, std::vector<std::vector<int>>& 
m4)
{
    for (int i = 0; i < matrix.size() / 2; i++) {
        for (int j = 0; j < matrix.size() / 2; j++) {
            m1[i][j] = matrix[i][j];
            m2[i][j] = matrix[i][j + matrix.size() / 2];
            m3[i][j] = matrix[i + (matrix.size() / 2)][j];
            m4[i][j] = matrix[i + (matrix.size() / 2)][j + (matrix.size() / 2)];
        }
    }
}

合并方法为:

std::vector<std::vector<int>> Matrix::combine(std::vector<std::vector<int>>& m1, std::vector<std::vector<int>>& m2, std::vector<std::vector<int>>& m3, std::vector<std::vector<int>>& m4)
{

    std::vector<int> inner(m1[0].size() + m2[0].size());
    std::vector<std::vector<int>> result(m1.size() + m2.size(), inner);

    for (int i = 0; i < m1.size(); i++) {
        for (int j = 0; j < m2.size(); j++) {
            result[i][j] = m1[i][j];
            result[i][j + m1[0].size()] = m2[i][j];
            result[i + m1.size()][j] = m3[i][j];
            result[i + m1.size()][j + m3[0].size()] = m4[i][j];
        }
    }

    return result;
}

最后的添加方法是:

std::vector<std::vector<int>> Matrix::add(std::vector<std::vector<int>>& matrix1, std::vector<std::vector<int>>& matrix2)
{
    std::vector<std::vector<int>> result(matrix1);

    for (int i = 0; i < matrix1.size(); i++) {
        for (int j = 0; j < matrix1.size(); j++) {
            result[i][j] = matrix1[i][j] + matrix2[i][j];
        }
    }

    return result;
}

朴素函数只是 returns 矩阵 1[0][0] * 矩阵 2[0][0] 在新向量中的乘法

struct matrix_view {
  int* m_data = 0;
  std::size_t m_stride = 1; // distance between lines
  std::size_t m_width = 0;
  std::size_t m_height = 0;

  int& operator()(std::size_t i, std::size_t j) const {
    return *get(i,j);
  }
  int* get(std::size_t i, std::size_t j) const {
    return m_data+(i*m_stride + j);
  }

  matrix_view slice(std::size_t i, std::size_t j, std::size_t width, std::size_t height ) {
    return {get(i,j), stride, width, height};
  }
};

struct matrix:matrix_view {
  std::vector<int> m_storage;
  matrix( std::size_t width, std::size_t height ):
    matrix_view{ nullptr, height, width, height },
    m_storage( width*height )
  {
    m_data = m_storage.data();
  }
  // these are unsafe unless we write them manually, because
  // matrix_view gets out of sync.
  matrix( matrix const& ) = delete;
  matrix& operator=( matrix const& ) = delete;
};

有了这些你就可以避免一堆内存复制。

它仍然可能打不过天真,因为花式乘法通常需要大尺寸才能打败天真。

这是一个未经测试的版本:

namespace Matrix {
    struct matrix_view {
      int* m_data = 0;
      std::size_t m_stride = 1; // distance between lines
      std::size_t m_size = 0;

      int& operator()(std::size_t i, std::size_t j) const {
        return *get(i,j);
      }
      int* get(std::size_t i, std::size_t j) const {
        return m_data+(i*m_stride + j);
      }

      matrix_view slice(std::size_t i, std::size_t j, std::size_t size ) {
        return {get(i,j), m_stride, size};
      }

      std::size_t count() const { return m_size * m_size; }
    };

    struct matrix:matrix_view {
      std::vector<int> m_storage;

      matrix( std::vector<int> storage, std::size_t size ):
        matrix_view{ storage.data(), size, size },
        m_storage(std::move(storage) )
      {
      }
      explicit matrix( std::size_t size ):
        matrix( std::vector<int>(size*size), size )
      {}
      // these are unsafe unless we write them manually, because
      // matrix_view gets out of sync.
      matrix( matrix const& o ):
        matrix(o.m_storage, o.m_size )
      {}
      matrix( matrix && o ):
        matrix(std::move(o.m_storage), o.m_size )
      {
        o.m_size = 0;
      }

      matrix& operator=( matrix const& ) = delete;
    };

    struct quad_t {
        matrix_view sub_11;
        matrix_view sub_12;
        matrix_view sub_21;
        matrix_view sub_22;
    };

    quad_t quad( matrix_view matrix )
    {
        quad_t r {
            matrix.slice(0,0, matrix.m_size/2 ),  matrix.slice(0,matrix.m_size/2, matrix.m_size/2 ),
            matrix.slice(matrix.m_size/2,0, matrix.m_size/2 ), matrix.slice(matrix.m_size/2, matrix.m_size/2, matrix.m_size/2 )
        };
        return r;
    }
    matrix add(matrix_view matrix1, matrix_view matrix2);
    matrix combine(matrix_view m1, matrix_view m2, matrix_view m3, matrix_view m4);
    matrix naive(matrix_view m1, matrix_view m2);

    matrix divideAndConquer(matrix_view matrix1, matrix_view matrix2)
    {
      if (matrix1.count() == 1) {
        return naive(matrix1, matrix2);
      }


      auto a = quad(matrix1);
      auto b = quad(matrix2);

      auto m1 = divideAndConquer(a.sub_11, b.sub_11);
      auto m2 = divideAndConquer(a.sub_12, b.sub_21);
      auto m3 = divideAndConquer(a.sub_11, b.sub_12);
      auto m4 = divideAndConquer(a.sub_12, b.sub_22);

      auto m5 = divideAndConquer(a.sub_21, b.sub_11);
      auto m6 = divideAndConquer(a.sub_22, b.sub_21);
      auto m7 = divideAndConquer(a.sub_21, b.sub_12);
      auto m8 = divideAndConquer(a.sub_22, b.sub_22);

      auto c1 = add(m1, m2);
      auto c2 = add(m3, m4);
      auto c3 = add(m5, m6);
      auto c4 = add(m7, m8);

      return combine(c1, c2, c3, c4);
    }


    matrix combine(matrix_view m1, matrix_view m2, matrix_view m3, matrix_view m4)
    {
      matrix result = matrix( m1.m_size*2 );

      for (std::size_t i = 0; i < m1.m_size; i++) {
        for (std::size_t j = 0; j < m1.m_size; j++) {
          result(i,j) = m1(i,j);
          result(i, j + m1.m_size) = m2(i,j);
          result(i + m1.m_size,j) = m3(i,j);
          result(i + m1.m_size,j + m3.m_size) = m4(i,j);
        }
      }

      return result;
    }

    matrix add(matrix_view matrix1, matrix_view matrix2)
    {
        matrix result(matrix1.m_size);

        for (std::size_t i = 0; i < matrix1.m_size; i++) {
            for (std::size_t j = 0; j < matrix1.m_size; j++) {
                result(i,j) = matrix1(i,j) + matrix2(i,j);
            }
        }

        return result;
    }
    matrix naive(matrix_view m1, matrix_view m2)
    {
      matrix r = matrix(m1.m_size);
      for (std::size_t i = 0; i < m1.m_size; ++i)
        for (std::size_t j = 0; j < m1.m_size; ++j)
          for (std::size_t k = 0; k < m1.m_size; ++k)
            r(i,j) += m1(i,k) * m2(k,j);
      return r;
    }
    void print( std::ostream& os, matrix_view m1 )
    {
      for (std::size_t i = 0; i < m1.m_size; ++i)
      {
        for (std::size_t j = 0; j < m1.m_size; ++j)
          std::cout << m1(i,j) << " ";
        std::cout << "\n";
      }
    }
}

int main()
{
    Matrix::matrix m1(16), m2(16);
    for (int i = 0; i < m1.m_size; ++i)
        m1(i,i) = 1;
    for (int i = 0; i < m1.m_size; ++i)
        m2(m2.m_size-i-1,i) = 1;

    auto m3 = Matrix::divideAndConquer(m1, m2);

    print(std::cout, m3);
}

重用 "result" 矩阵将给速度带来另一个巨大的提升。另外,在尺寸大于 1 时切换到 naive。

Live example.