C++分而治之矩阵乘法算法太慢
C++ Divide and conquer matrix multiplication algorithm too slow
我想请你帮忙。我在 C++ 中实现了分而治之算法。它工作正常,但它真的很慢。 512x512 矩阵的计算时间为 90 秒,与使用朴素算法的 0.4 秒相比,这是不可接受的。你能告诉我,如果,我做错了什么吗?
主要的分治功能是:
std::vector<std::vector<int>> Matrix::divideAndConquer(std::vector<std::vector<int>>& matrix1,
std::vector<std::vector<int>>& matrix2)
{
if (matrix1.size() == 1) {
return naive(matrix1, matrix2);
}
int newSize = matrix1.size() / 2;
std::vector<int> inner(newSize);
std::vector<std::vector<int>> a11(newSize, inner);
std::vector<std::vector<int>> a12(newSize, inner);
std::vector<std::vector<int>> a21(newSize, inner);
std::vector<std::vector<int>> a22(newSize, inner);
std::vector<std::vector<int>> b11(newSize, inner);
std::vector<std::vector<int>> b12(newSize, inner);
std::vector<std::vector<int>> b21(newSize, inner);
std::vector<std::vector<int>> b22(newSize, inner);
Matrix::divideMatrix(matrix1, a11, a12, a21, a22);
Matrix::divideMatrix(matrix2, b11, b12, b21, b22);
std::vector<std::vector<int>> m1 = divideAndConquer(a11, b11);
std::vector<std::vector<int>> m2 = divideAndConquer(a12, b21);
std::vector<std::vector<int>> m3 = divideAndConquer(a11, b12);
std::vector<std::vector<int>> m4 = divideAndConquer(a12, b22);
std::vector<std::vector<int>> m5 = divideAndConquer(a21, b11);
std::vector<std::vector<int>> m6 = divideAndConquer(a22, b21);
std::vector<std::vector<int>> m7 = divideAndConquer(a21, b12);
std::vector<std::vector<int>> m8 = divideAndConquer(a22, b22);
std::vector<std::vector<int>> c1 = add(m1, m2);
std::vector<std::vector<int>> c2 = add(m3, m4);
std::vector<std::vector<int>> c3 = add(m5, m6);
std::vector<std::vector<int>> c4 = add(m7, m8);
return combine(c1, c2, c3, c4);
}
然后划分矩阵函数为:
void Matrix::divideMatrix(std::vector<std::vector<int>>& matrix, std::vector<std::vector<int>>& m1,
std::vector<std::vector<int>>& m2, std::vector<std::vector<int>>& m3, std::vector<std::vector<int>>&
m4)
{
for (int i = 0; i < matrix.size() / 2; i++) {
for (int j = 0; j < matrix.size() / 2; j++) {
m1[i][j] = matrix[i][j];
m2[i][j] = matrix[i][j + matrix.size() / 2];
m3[i][j] = matrix[i + (matrix.size() / 2)][j];
m4[i][j] = matrix[i + (matrix.size() / 2)][j + (matrix.size() / 2)];
}
}
}
合并方法为:
std::vector<std::vector<int>> Matrix::combine(std::vector<std::vector<int>>& m1, std::vector<std::vector<int>>& m2, std::vector<std::vector<int>>& m3, std::vector<std::vector<int>>& m4)
{
std::vector<int> inner(m1[0].size() + m2[0].size());
std::vector<std::vector<int>> result(m1.size() + m2.size(), inner);
for (int i = 0; i < m1.size(); i++) {
for (int j = 0; j < m2.size(); j++) {
result[i][j] = m1[i][j];
result[i][j + m1[0].size()] = m2[i][j];
result[i + m1.size()][j] = m3[i][j];
result[i + m1.size()][j + m3[0].size()] = m4[i][j];
}
}
return result;
}
最后的添加方法是:
std::vector<std::vector<int>> Matrix::add(std::vector<std::vector<int>>& matrix1, std::vector<std::vector<int>>& matrix2)
{
std::vector<std::vector<int>> result(matrix1);
for (int i = 0; i < matrix1.size(); i++) {
for (int j = 0; j < matrix1.size(); j++) {
result[i][j] = matrix1[i][j] + matrix2[i][j];
}
}
return result;
}
朴素函数只是 returns 矩阵 1[0][0] * 矩阵 2[0][0] 在新向量中的乘法
struct matrix_view {
int* m_data = 0;
std::size_t m_stride = 1; // distance between lines
std::size_t m_width = 0;
std::size_t m_height = 0;
int& operator()(std::size_t i, std::size_t j) const {
return *get(i,j);
}
int* get(std::size_t i, std::size_t j) const {
return m_data+(i*m_stride + j);
}
matrix_view slice(std::size_t i, std::size_t j, std::size_t width, std::size_t height ) {
return {get(i,j), stride, width, height};
}
};
struct matrix:matrix_view {
std::vector<int> m_storage;
matrix( std::size_t width, std::size_t height ):
matrix_view{ nullptr, height, width, height },
m_storage( width*height )
{
m_data = m_storage.data();
}
// these are unsafe unless we write them manually, because
// matrix_view gets out of sync.
matrix( matrix const& ) = delete;
matrix& operator=( matrix const& ) = delete;
};
有了这些你就可以避免一堆内存复制。
它仍然可能打不过天真,因为花式乘法通常需要大尺寸才能打败天真。
这是一个未经测试的版本:
namespace Matrix {
struct matrix_view {
int* m_data = 0;
std::size_t m_stride = 1; // distance between lines
std::size_t m_size = 0;
int& operator()(std::size_t i, std::size_t j) const {
return *get(i,j);
}
int* get(std::size_t i, std::size_t j) const {
return m_data+(i*m_stride + j);
}
matrix_view slice(std::size_t i, std::size_t j, std::size_t size ) {
return {get(i,j), m_stride, size};
}
std::size_t count() const { return m_size * m_size; }
};
struct matrix:matrix_view {
std::vector<int> m_storage;
matrix( std::vector<int> storage, std::size_t size ):
matrix_view{ storage.data(), size, size },
m_storage(std::move(storage) )
{
}
explicit matrix( std::size_t size ):
matrix( std::vector<int>(size*size), size )
{}
// these are unsafe unless we write them manually, because
// matrix_view gets out of sync.
matrix( matrix const& o ):
matrix(o.m_storage, o.m_size )
{}
matrix( matrix && o ):
matrix(std::move(o.m_storage), o.m_size )
{
o.m_size = 0;
}
matrix& operator=( matrix const& ) = delete;
};
struct quad_t {
matrix_view sub_11;
matrix_view sub_12;
matrix_view sub_21;
matrix_view sub_22;
};
quad_t quad( matrix_view matrix )
{
quad_t r {
matrix.slice(0,0, matrix.m_size/2 ), matrix.slice(0,matrix.m_size/2, matrix.m_size/2 ),
matrix.slice(matrix.m_size/2,0, matrix.m_size/2 ), matrix.slice(matrix.m_size/2, matrix.m_size/2, matrix.m_size/2 )
};
return r;
}
matrix add(matrix_view matrix1, matrix_view matrix2);
matrix combine(matrix_view m1, matrix_view m2, matrix_view m3, matrix_view m4);
matrix naive(matrix_view m1, matrix_view m2);
matrix divideAndConquer(matrix_view matrix1, matrix_view matrix2)
{
if (matrix1.count() == 1) {
return naive(matrix1, matrix2);
}
auto a = quad(matrix1);
auto b = quad(matrix2);
auto m1 = divideAndConquer(a.sub_11, b.sub_11);
auto m2 = divideAndConquer(a.sub_12, b.sub_21);
auto m3 = divideAndConquer(a.sub_11, b.sub_12);
auto m4 = divideAndConquer(a.sub_12, b.sub_22);
auto m5 = divideAndConquer(a.sub_21, b.sub_11);
auto m6 = divideAndConquer(a.sub_22, b.sub_21);
auto m7 = divideAndConquer(a.sub_21, b.sub_12);
auto m8 = divideAndConquer(a.sub_22, b.sub_22);
auto c1 = add(m1, m2);
auto c2 = add(m3, m4);
auto c3 = add(m5, m6);
auto c4 = add(m7, m8);
return combine(c1, c2, c3, c4);
}
matrix combine(matrix_view m1, matrix_view m2, matrix_view m3, matrix_view m4)
{
matrix result = matrix( m1.m_size*2 );
for (std::size_t i = 0; i < m1.m_size; i++) {
for (std::size_t j = 0; j < m1.m_size; j++) {
result(i,j) = m1(i,j);
result(i, j + m1.m_size) = m2(i,j);
result(i + m1.m_size,j) = m3(i,j);
result(i + m1.m_size,j + m3.m_size) = m4(i,j);
}
}
return result;
}
matrix add(matrix_view matrix1, matrix_view matrix2)
{
matrix result(matrix1.m_size);
for (std::size_t i = 0; i < matrix1.m_size; i++) {
for (std::size_t j = 0; j < matrix1.m_size; j++) {
result(i,j) = matrix1(i,j) + matrix2(i,j);
}
}
return result;
}
matrix naive(matrix_view m1, matrix_view m2)
{
matrix r = matrix(m1.m_size);
for (std::size_t i = 0; i < m1.m_size; ++i)
for (std::size_t j = 0; j < m1.m_size; ++j)
for (std::size_t k = 0; k < m1.m_size; ++k)
r(i,j) += m1(i,k) * m2(k,j);
return r;
}
void print( std::ostream& os, matrix_view m1 )
{
for (std::size_t i = 0; i < m1.m_size; ++i)
{
for (std::size_t j = 0; j < m1.m_size; ++j)
std::cout << m1(i,j) << " ";
std::cout << "\n";
}
}
}
int main()
{
Matrix::matrix m1(16), m2(16);
for (int i = 0; i < m1.m_size; ++i)
m1(i,i) = 1;
for (int i = 0; i < m1.m_size; ++i)
m2(m2.m_size-i-1,i) = 1;
auto m3 = Matrix::divideAndConquer(m1, m2);
print(std::cout, m3);
}
重用 "result" 矩阵将给速度带来另一个巨大的提升。另外,在尺寸大于 1 时切换到 naive。
我想请你帮忙。我在 C++ 中实现了分而治之算法。它工作正常,但它真的很慢。 512x512 矩阵的计算时间为 90 秒,与使用朴素算法的 0.4 秒相比,这是不可接受的。你能告诉我,如果,我做错了什么吗?
主要的分治功能是:
std::vector<std::vector<int>> Matrix::divideAndConquer(std::vector<std::vector<int>>& matrix1,
std::vector<std::vector<int>>& matrix2)
{
if (matrix1.size() == 1) {
return naive(matrix1, matrix2);
}
int newSize = matrix1.size() / 2;
std::vector<int> inner(newSize);
std::vector<std::vector<int>> a11(newSize, inner);
std::vector<std::vector<int>> a12(newSize, inner);
std::vector<std::vector<int>> a21(newSize, inner);
std::vector<std::vector<int>> a22(newSize, inner);
std::vector<std::vector<int>> b11(newSize, inner);
std::vector<std::vector<int>> b12(newSize, inner);
std::vector<std::vector<int>> b21(newSize, inner);
std::vector<std::vector<int>> b22(newSize, inner);
Matrix::divideMatrix(matrix1, a11, a12, a21, a22);
Matrix::divideMatrix(matrix2, b11, b12, b21, b22);
std::vector<std::vector<int>> m1 = divideAndConquer(a11, b11);
std::vector<std::vector<int>> m2 = divideAndConquer(a12, b21);
std::vector<std::vector<int>> m3 = divideAndConquer(a11, b12);
std::vector<std::vector<int>> m4 = divideAndConquer(a12, b22);
std::vector<std::vector<int>> m5 = divideAndConquer(a21, b11);
std::vector<std::vector<int>> m6 = divideAndConquer(a22, b21);
std::vector<std::vector<int>> m7 = divideAndConquer(a21, b12);
std::vector<std::vector<int>> m8 = divideAndConquer(a22, b22);
std::vector<std::vector<int>> c1 = add(m1, m2);
std::vector<std::vector<int>> c2 = add(m3, m4);
std::vector<std::vector<int>> c3 = add(m5, m6);
std::vector<std::vector<int>> c4 = add(m7, m8);
return combine(c1, c2, c3, c4);
}
然后划分矩阵函数为:
void Matrix::divideMatrix(std::vector<std::vector<int>>& matrix, std::vector<std::vector<int>>& m1,
std::vector<std::vector<int>>& m2, std::vector<std::vector<int>>& m3, std::vector<std::vector<int>>&
m4)
{
for (int i = 0; i < matrix.size() / 2; i++) {
for (int j = 0; j < matrix.size() / 2; j++) {
m1[i][j] = matrix[i][j];
m2[i][j] = matrix[i][j + matrix.size() / 2];
m3[i][j] = matrix[i + (matrix.size() / 2)][j];
m4[i][j] = matrix[i + (matrix.size() / 2)][j + (matrix.size() / 2)];
}
}
}
合并方法为:
std::vector<std::vector<int>> Matrix::combine(std::vector<std::vector<int>>& m1, std::vector<std::vector<int>>& m2, std::vector<std::vector<int>>& m3, std::vector<std::vector<int>>& m4)
{
std::vector<int> inner(m1[0].size() + m2[0].size());
std::vector<std::vector<int>> result(m1.size() + m2.size(), inner);
for (int i = 0; i < m1.size(); i++) {
for (int j = 0; j < m2.size(); j++) {
result[i][j] = m1[i][j];
result[i][j + m1[0].size()] = m2[i][j];
result[i + m1.size()][j] = m3[i][j];
result[i + m1.size()][j + m3[0].size()] = m4[i][j];
}
}
return result;
}
最后的添加方法是:
std::vector<std::vector<int>> Matrix::add(std::vector<std::vector<int>>& matrix1, std::vector<std::vector<int>>& matrix2)
{
std::vector<std::vector<int>> result(matrix1);
for (int i = 0; i < matrix1.size(); i++) {
for (int j = 0; j < matrix1.size(); j++) {
result[i][j] = matrix1[i][j] + matrix2[i][j];
}
}
return result;
}
朴素函数只是 returns 矩阵 1[0][0] * 矩阵 2[0][0] 在新向量中的乘法
struct matrix_view {
int* m_data = 0;
std::size_t m_stride = 1; // distance between lines
std::size_t m_width = 0;
std::size_t m_height = 0;
int& operator()(std::size_t i, std::size_t j) const {
return *get(i,j);
}
int* get(std::size_t i, std::size_t j) const {
return m_data+(i*m_stride + j);
}
matrix_view slice(std::size_t i, std::size_t j, std::size_t width, std::size_t height ) {
return {get(i,j), stride, width, height};
}
};
struct matrix:matrix_view {
std::vector<int> m_storage;
matrix( std::size_t width, std::size_t height ):
matrix_view{ nullptr, height, width, height },
m_storage( width*height )
{
m_data = m_storage.data();
}
// these are unsafe unless we write them manually, because
// matrix_view gets out of sync.
matrix( matrix const& ) = delete;
matrix& operator=( matrix const& ) = delete;
};
有了这些你就可以避免一堆内存复制。
它仍然可能打不过天真,因为花式乘法通常需要大尺寸才能打败天真。
这是一个未经测试的版本:
namespace Matrix {
struct matrix_view {
int* m_data = 0;
std::size_t m_stride = 1; // distance between lines
std::size_t m_size = 0;
int& operator()(std::size_t i, std::size_t j) const {
return *get(i,j);
}
int* get(std::size_t i, std::size_t j) const {
return m_data+(i*m_stride + j);
}
matrix_view slice(std::size_t i, std::size_t j, std::size_t size ) {
return {get(i,j), m_stride, size};
}
std::size_t count() const { return m_size * m_size; }
};
struct matrix:matrix_view {
std::vector<int> m_storage;
matrix( std::vector<int> storage, std::size_t size ):
matrix_view{ storage.data(), size, size },
m_storage(std::move(storage) )
{
}
explicit matrix( std::size_t size ):
matrix( std::vector<int>(size*size), size )
{}
// these are unsafe unless we write them manually, because
// matrix_view gets out of sync.
matrix( matrix const& o ):
matrix(o.m_storage, o.m_size )
{}
matrix( matrix && o ):
matrix(std::move(o.m_storage), o.m_size )
{
o.m_size = 0;
}
matrix& operator=( matrix const& ) = delete;
};
struct quad_t {
matrix_view sub_11;
matrix_view sub_12;
matrix_view sub_21;
matrix_view sub_22;
};
quad_t quad( matrix_view matrix )
{
quad_t r {
matrix.slice(0,0, matrix.m_size/2 ), matrix.slice(0,matrix.m_size/2, matrix.m_size/2 ),
matrix.slice(matrix.m_size/2,0, matrix.m_size/2 ), matrix.slice(matrix.m_size/2, matrix.m_size/2, matrix.m_size/2 )
};
return r;
}
matrix add(matrix_view matrix1, matrix_view matrix2);
matrix combine(matrix_view m1, matrix_view m2, matrix_view m3, matrix_view m4);
matrix naive(matrix_view m1, matrix_view m2);
matrix divideAndConquer(matrix_view matrix1, matrix_view matrix2)
{
if (matrix1.count() == 1) {
return naive(matrix1, matrix2);
}
auto a = quad(matrix1);
auto b = quad(matrix2);
auto m1 = divideAndConquer(a.sub_11, b.sub_11);
auto m2 = divideAndConquer(a.sub_12, b.sub_21);
auto m3 = divideAndConquer(a.sub_11, b.sub_12);
auto m4 = divideAndConquer(a.sub_12, b.sub_22);
auto m5 = divideAndConquer(a.sub_21, b.sub_11);
auto m6 = divideAndConquer(a.sub_22, b.sub_21);
auto m7 = divideAndConquer(a.sub_21, b.sub_12);
auto m8 = divideAndConquer(a.sub_22, b.sub_22);
auto c1 = add(m1, m2);
auto c2 = add(m3, m4);
auto c3 = add(m5, m6);
auto c4 = add(m7, m8);
return combine(c1, c2, c3, c4);
}
matrix combine(matrix_view m1, matrix_view m2, matrix_view m3, matrix_view m4)
{
matrix result = matrix( m1.m_size*2 );
for (std::size_t i = 0; i < m1.m_size; i++) {
for (std::size_t j = 0; j < m1.m_size; j++) {
result(i,j) = m1(i,j);
result(i, j + m1.m_size) = m2(i,j);
result(i + m1.m_size,j) = m3(i,j);
result(i + m1.m_size,j + m3.m_size) = m4(i,j);
}
}
return result;
}
matrix add(matrix_view matrix1, matrix_view matrix2)
{
matrix result(matrix1.m_size);
for (std::size_t i = 0; i < matrix1.m_size; i++) {
for (std::size_t j = 0; j < matrix1.m_size; j++) {
result(i,j) = matrix1(i,j) + matrix2(i,j);
}
}
return result;
}
matrix naive(matrix_view m1, matrix_view m2)
{
matrix r = matrix(m1.m_size);
for (std::size_t i = 0; i < m1.m_size; ++i)
for (std::size_t j = 0; j < m1.m_size; ++j)
for (std::size_t k = 0; k < m1.m_size; ++k)
r(i,j) += m1(i,k) * m2(k,j);
return r;
}
void print( std::ostream& os, matrix_view m1 )
{
for (std::size_t i = 0; i < m1.m_size; ++i)
{
for (std::size_t j = 0; j < m1.m_size; ++j)
std::cout << m1(i,j) << " ";
std::cout << "\n";
}
}
}
int main()
{
Matrix::matrix m1(16), m2(16);
for (int i = 0; i < m1.m_size; ++i)
m1(i,i) = 1;
for (int i = 0; i < m1.m_size; ++i)
m2(m2.m_size-i-1,i) = 1;
auto m3 = Matrix::divideAndConquer(m1, m2);
print(std::cout, m3);
}
重用 "result" 矩阵将给速度带来另一个巨大的提升。另外,在尺寸大于 1 时切换到 naive。