C++ MPI:数组上的 std::merge

C++ MPI: std::merge on arrays

我这里有一个有趣的问题。我有一些子数组已经排序,我需要将它们合并到一个排序的大数组中。我尝试在下面的代码中执行此操作,但没有得到预期的结果。

你们中的一个能告诉我我做错了什么吗?因为我没有头绪,我的逻辑我认为看起来不错..

这是我的代码:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <algorithm>
#include <vector>
#include <iostream>

using namespace std;

#define N 32
#define ROOT 0

int A[N];  // this should be global

void quickSort(int*, int, int);
int partition(int*, int, int);

int main(int argc, char *argv[]) {

    int size;
    int rank;

    vector<int> result(N);

    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    int count = N / size;

    int *localArray = (int *) malloc(count * sizeof(int));

    if (rank == ROOT) {
        for (int i = 0; i < N; i++) {
            A[i] = rand() % 10;
        }
        // master local copy
        for (int i = 0; i < count; i++)
            localArray[i] = A[i];

        for (int dest = 1; dest < size; ++dest) {
            MPI_Send(&A[dest * count], count, MPI_INT, dest, 1, MPI_COMM_WORLD);
            printf("P0 sent a %d elements to P%d.\n", count, dest);
        }

        int source = 1;

        int sizeResult = count * 2;
        int sizeResult2 = count;

        int tmpVec[sizeResult2];
        int tm[sizeResult];

        MPI_Recv(tmpVec, count, MPI_INT, source, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

        for (int source = 2; source < size; source++) {
            MPI_Recv(localArray, count, MPI_INT, source, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

            //-------------------------------HERE IS THE PROBLEM---------------------------
            merge(tmpVec, tmpVec + sizeResult2, localArray, localArray + count, tm);

            sizeResult2 = sizeResult;
            for (int i = 0; i < sizeResult; i++) {
                tmpVec[i] = tm[i];
                cout << tm[i] << " ";
            }
            cout << endl;
            sizeResult += count;
            //-------------------------------------------------------------------------------
        }

        for (int i = 0; i < sizeResult2; i++)
            cout << tmpVec[i] << " ";
        cout << endl << sizeResult2 << endl;

        for (int i = 0; i < N; i++)
            cout << A[i] << " ";

    }
    else {
        MPI_Recv(localArray, count, MPI_INT, ROOT, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

        quickSort(localArray, 0, count);

        MPI_Send(localArray, count, MPI_INT, ROOT, 2, MPI_COMM_WORLD);
    }
    MPI_Finalize();
    return 0;
}
void quickSort(int* A, int p, int q) {
    int r;
    if (p < q) {
        r = partition(A, p, q);
        quickSort(A, p, r);
        quickSort(A, r + 1, q);
    }
}

int partition(int* A, int p, int q) {
    int x = A[p];
    int i = p;
    int j;

    for (j = p + 1; j < q; j++) {
        if (A[j] <= x) {
            i = i + 1;
            swap(A[i], A[j]);
        }

    }
    swap(A[i], A[p]);
    return i;
}

你怎么看,我尝试将第一个子数组与第二个子数组合并,然后将它们的结果与第三个子数组合并,依此类推...

您没有在中间缓冲区(tmpVectm)中分配足够的内存。为避免这种情况,只需使用 std::vector 而不是使用低级数组:

std::vector<int> tmpVec(count);
std::vector<int> tm;

MPI_Recv(tmpVec.data(), count, MPI_INT, source, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

for (int source = 2; source < size; source++) {
    MPI_Recv(localArray, count, MPI_INT, source, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

    merge(tmpVec.begin(), tmpVec.end(), localArray, localArray + count, std::back_inserter(tm));

    tmpVec = tm;
    tm.resize(0);
}

作为一般提示,请更仔细地考虑变量名的选择。当变量名具有描述性时,更容易推理代码。

再次回到您的实际问题:使用分散和聚集!使用递归合并排序在聚集的连续数组上使用 std::inplace_merge 是相当直接的。诀窍是使用已排序的每个局部部分的偏移量,并向该数组添加 "end" 偏移量。

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <algorithm>

#define N 16 
int A[N]; 

// This is basically textbook recursive merge sort using std::merge_inplace
// but it considers the offsets of segments that are already sorted
void merge_indexed(int data[], const int offsets[], size_t index_begin, size_t index_end)
{
    if (index_end - index_begin > 1) {
        auto index_middle = index_begin + (index_end - index_begin) / 2;
        merge_indexed(data, offsets, index_begin, index_middle);
        merge_indexed(data, offsets, index_middle, index_end);
        std::inplace_merge(&data[offsets[index_begin]], &data[offsets[index_middle]], &data[offsets[index_end]]);
    }
}

int main(int argc, char *argv[]) {
    int size;
    int rank;
    const int ROOT = 0; 

    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    auto remainder = N % size;
    int local_counts[size], offsets[size + 1];
    int sum = 0;
    for (int i = 0; i < size; i++) {
        local_counts[i] = N / size;
        if (remainder > 0) {
            local_counts[i] += 1;
            remainder--;
        }
        offsets[i] = sum;
        sum += local_counts[i];
    }
    offsets[size] = sum;

    int localArray[local_counts[rank]];

    if (rank == ROOT) {
        for (int i = 0; i < N; i++) {
            A[i] = rand() % 10;
        }
    }

    MPI_Scatterv(A, local_counts, offsets, MPI_INT, localArray, local_counts[rank], MPI_INT, ROOT, MPI_COMM_WORLD);

    std::sort(&localArray[0], &localArray[local_counts[rank]]);

    MPI_Gatherv(localArray, local_counts[rank], MPI_INT, A, local_counts, offsets, MPI_INT, ROOT, MPI_COMM_WORLD);

    if (rank == ROOT) {
        //---------------Merge sections in localArray-------------------
        merge_indexed(A, offsets, 0, size);
    }

    MPI_Finalize();
    return 0;
}