合并排序算法中使用二分搜索的合并步骤

Merge step using binary search in merge sort algorithm

我们必须将串行代码转换为并行代码,根据我的研究,如果使用标准方法,则无法并行化合并步骤。必须使用本讲义中介绍的一种方法(算法 3)lecture notes。在尝试使用并行化之前,我尝试实现代码并查看它是否串行运行。

但是验证失败,即应该排序的数组没有排序。我的主要问题是在可能有多个元素的数组中找到一个元素的等级。该算法在讲义中有所描述,但我开始相信如果有多个相同数字的实例,它可能不起作用。

假设我们有两个数组

A=[0_1,0_2,2_1,5_1,5_2,8_1] and B=[0_3,2_2,2_3,5_3,6_1,6_2]

最后我们想要

C=[0_1,0_2,0_3,2_1,2_2,2_3,5_1,5_2,5_3,6_1,6_2,8_1]

我们想通过计算排名来做到这一点,

rank_C(x)=rank_B(x)+rank_A(x)

我知道结果rank_C(x)

但我不知道用二进制搜索在对数时间分别找到rank_A(x)和rank_B(x)的算法

我们应该有:

0=rank_C(0_1)=rank_A("")+rank_B("")
1=rank_C(0_2)=rank_A("")+rank_B("")
2=rank_C(0_3)=rank_A("")+rank_B("")
3=rank_C(2_1)=rank_A("")+rank_B("")
4=rank_C(2_2)=rank_A("")+rank_B("")
5=rank_C(2_3)=rank_A("")+rank_B("")
6=rank_C(5_1)=rank_A("")+rank_B("")
7=rank_C(5_2)=rank_A("")+rank_B("")
8=rank_C(5_3)=rank_A("")+rank_B("")
9=rank_C(6_1)=rank_A("")+rank_B("")
10=rank_C(6_2)=rank_A("")+rank_B("")
11=rank_C(8_6)=rank_A("")+rank_B("")

或者左侧递增 1 的问题的解决方案。

我什至不知道函数 rank_A(x) 的解应该是什么样子,因为如果列表 A 具有例如最小和最大元素怎么办?

如果我们要为最小的元素分配一个0,这意味着rankA(smallest)+rankB(smalles)=0+0。然而,这意味着在这种情况下我们的值范围被限制为 10。所以我们必须为最小的元素分配一个 1,为最大的元素分配一个 12,但是我们该怎么做呢?这似乎是不可能的,因为如果我们想从加法中得到 1,那么其中一个函数必须产生 0,因此另一个函数的最大值可以为 5。这意味着我们总共只能得到一个最大值11,但这不可能,因为我们想将 12 分配给我们最大的元素。

我还是试过了:

相关部分是合并步骤

旧代码 - 工作:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>

#include <iostream>
#include <algorithm>

#include <cstdlib>
#include <cstdio>

#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1



#endif


/**
  * helper routine: check if array is sorted correctly
  */
bool isSorted(int ref[], int data[], const size_t size){
    std::sort(ref, ref + size);
    for (size_t idx = 0; idx < size; ++idx){
        if (ref[idx] != data[idx]) {
            return false;
        }
    }
    return true;
}

/**
  * sequential merge step (straight-forward implementation)
  */
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
    long left = begin1;
    long right = begin2;
    long idx = outBegin;
    
    while (left < end1 && right < end2) {
        if (in[left] <= in[right]) {
            out[idx] = in[left];    
            left++;
        } else {
            out[idx] = in[right];
            right++;
        }
        idx++;
        }
    
    while (left < end1) {
        out[idx] = in[left];
        left++, idx++;
    }
    while (right < end2) {
        out[idx] = in[right];
        right++, idx++;
    }
}
bool myfunc (long i , long j){return (i<j);}
/**
  * sequential MergeSort
  */
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
  if (begin < (end - 1)) {
           long half =(begin+end) / 2;


            
         {
           

             MsSequential(array, tmp, !inplace, begin, half);

             MsSequential(array, tmp, !inplace, half, end);
              }
 if (inplace){
            MsMergeSequential(array, tmp, begin, half, half, end, begin);
 } else {
            MsMergeSequential(tmp, array, begin, half, half, end, begin);
 }
        
    } else if (!inplace) {

        tmp[begin] = array[begin];
    }
}

/**
  * Serial MergeSort
  */
void MsSerial(int *array, int *tmp, const size_t size) {

    MsSequential(array, tmp, true, 0, size);
}

/**

/**
  * @brief program entry point
  */
int main(int argc, char* argv[]) {
    // variables to measure the elapsed time
    struct timeval t1, t2;
    double etime;

    // expect one command line arguments: array size
    if (argc != 2) {
        printf("Usage: MergeSort.exe <array size> \n");
        printf("\n");
        return EXIT_FAILURE;
    }
    else {
        const size_t stSize = strtol(argv[1], NULL, 10);
        int *data = (int*) malloc(stSize * sizeof(int));
        int *tmp = (int*) malloc(stSize * sizeof(int));     
        int *ref = (int*) malloc(stSize * sizeof(int));
        printf("Initialization...\n");

        srand(95);

        
        for (size_t idx = 0; idx < stSize; ++idx){
            data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
        }
        std::copy(data, data + stSize, ref);

        double dSize = (stSize * sizeof(int)) / 1024 / 1024;
        printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);

        gettimeofday(&t1, NULL);
        
        {
        
        {
        MsSerial(data, tmp, stSize);
        }
        }
        gettimeofday(&t2, NULL);
        etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
        etime = etime / 1000;

        printf("done, took %f sec. Verification...", etime);
        if (isSorted(ref, data, stSize)) {
            printf(" successful.\n");
        }
        else {
            printf(" FAILED.\n");
        }

        free(data);
        //delete[] data;
        free(tmp);
        //delete[] tmp;
        free(ref);
        //delete[] ref;
    }

    return EXIT_SUCCESS;
}

新代码 - 不起作用

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>

#include <iostream>
#include <algorithm>

#include <cstdlib>
#include <cstdio>

#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1



#endif


int mybinarysearchleftmost(int *in,int n, int value){
  

    int R=n;
    int L=0;
    while(R-L>1){
        int middle = (R+L)/2;
    
        if(in[middle]==value){while(in[middle]==value && middle > 0){middle=middle-1;}if(middle==0){return -1;} else {return (middle+1);}}
        if(in[middle]<value){L=middle+1;}
        if(in[middle]>value){R=middle-1;}
    }
    if(in[R]<value)
    {
    return R;}
    else{
      
      if(in[L]>= value && L==0){return -1;}
else return L;}


}


/**
  * helper routine: check if array is sorted correctly
  */
bool isSorted(int ref[], int data[], const size_t size){
    std::sort(ref, ref + size);
    for (size_t idx = 0; idx < size; ++idx){
        if (ref[idx] != data[idx]) {
            return false;
        }
    }
    return true;
}

/**
  * sequential merge step (straight-forward implementation)
  */
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
    
    
    int helperarray[(end2-begin1)];
    
    for(int i=0;i<(end2-begin1);i++){
        helperarray[i]=0;
    }
    int helperarray2[(end2-begin1)];
    
    for(int i=0;i<(end2-begin1);i++){
        helperarray2[i]=0;
    }
    int array1[end1-begin1];
    
    for(int i=0;i<(end1-begin1);i++){
      array1[i]=in[begin1+i];
    }
    printf("[");
for(int i=0;i<(end1-begin1);i++){printf("%u,",array1[i]);}
printf("]\n");
    int array2[end2-begin2];
    
    for(int i=0;i<(end1-begin1);i++){
      array2[i]=in[begin2+i];
    }
    printf("[");
for(int i=0;i<(end2-begin2);i++){printf("%u,",array2[i]);}
printf("]\n");

    for(int i = 0;i<(end2-begin2);i++){
  helperarray[i+mybinarysearchleftmost(array1,(end2-begin2),array2[i])]=array2[i];
  helperarray2[i+mybinarysearchleftmost(array1,(end2-begin2),array2[i])]=1;
 }
 int counter=0;
 for(int i = 0;i<(end2-begin1);i++){
  if(helperarray2[i]==0){helperarray[i]=array1[counter];counter++;}
 }
 for(int i=0;i<(end2-begin1);i++){
   out[begin1+i]=helperarray[i];
}

  
}
bool myfunc (long i , long j){return (i<j);}
/**
  * sequential MergeSort
  */
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
  if (begin < (end - 1)) {
           long half =(begin+end) / 2;


            
         {
           

             MsSequential(array, tmp, !inplace, begin, half);

             MsSequential(array, tmp, !inplace, half, end);
              }
 if (inplace){
            MsMergeSequential(array, tmp, begin, half, half, end, begin);
 } else {
            MsMergeSequential(tmp, array, begin, half, half, end, begin);
 }
        
    } else if (!inplace) {

        tmp[begin] = array[begin];
    }
}

/**
  * Serial MergeSort
  */
void MsSerial(int *array, int *tmp, const size_t size) {

    MsSequential(array, tmp, true, 0, size);
}

/**

/**
  * @brief program entry point
  */
int main(int argc, char* argv[]) {
    // variables to measure the elapsed time
    struct timeval t1, t2;
    double etime;

    // expect one command line arguments: array size
    if (argc != 2) {
        printf("Usage: MergeSort.exe <array size> \n");
        printf("\n");
        return EXIT_FAILURE;
    }
    else {
        const size_t stSize = strtol(argv[1], NULL, 10);
        int *data = (int*) malloc(stSize * sizeof(int));
        int *tmp = (int*) malloc(stSize * sizeof(int));     
        int *ref = (int*) malloc(stSize * sizeof(int));
        printf("Initialization...\n");

        srand(95);

        
        for (size_t idx = 0; idx < stSize; ++idx){
            data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
        }
        std::copy(data, data + stSize, ref);

        double dSize = (stSize * sizeof(int)) / 1024 / 1024;
        printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);

        gettimeofday(&t1, NULL);
        
        {
        
        {
        MsSerial(data, tmp, stSize);
        }
        }
        gettimeofday(&t2, NULL);
        etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
        etime = etime / 1000;

        printf("done, took %f sec. Verification...", etime);
        if (isSorted(ref, data, stSize)) {
            printf(" successful.\n");
        }
        else {
            printf(" FAILED.\n");
        }

        free(data);
        //delete[] data;
        free(tmp);
        //delete[] tmp;
        free(ref);
        //delete[] ref;
    }

    return EXIT_SUCCESS;
}

最后一个数组看起来像这样,mergestep 出错了:

[9,4,1,1,1,1,10967,1,10,11,1,1,10967,10967,1,17,10967,1,0,681153680,]

我终于在答案的帮助下解决了问题:现在这是新的工作代码:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>

#include <iostream>
#include <algorithm>

#include <cstdlib>
#include <cstdio>

#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1



#endif


//Takes a sorted list of size n and a value, puts the value in one of n+1 possible positions, if value is same to an element of the list take the position after the last occurence of the same element

int binarysearchfindlowerrank(int *in,int n,int value){


    int L=0;
    int R=n;
    while(R-L>1){
        int middle = (R+L)/2;

        if(in[middle]==value){while(in[middle]==value&&middle>0){middle=middle-1;}return middle+1;}
        if(in[middle]<value){L=middle;}
        if(in[middle]>value){R=middle;}
    }

    if(L==0&&in[L]>value){return 0;}
    if(R==n && in[R-1]< value){return n;}
    if(R==n&& in[R-1]>=value){return R-1;}
    if(in[R]<value){return R+1;}
    if(in[L]<value){return R;}
    return L;
}

//Takes a sorted list of size n and a value, puts the value in one of n+1 possible positions, if value is same to an element of the list take the position before the first occurence of the same element



int binarysearchfinduperrank(int *in,int n,int value){

    int L=0;
    int R=n;
    while(R-L>1){

        int middle = (R+L)/2;

        if(in[middle]==value){
            while(in[middle]==value&&middle<n)
            {middle=middle+1;}
            return middle;}
        if(in[middle]<value){L=middle;}
        if(in[middle]>value){R=middle;}
    }

    if(L==0&&in[L]>value){return 0;}
    if(R==n && in[R-1]<= value){return n;}
    if(R==n&& in[R-1]>value){return R-1;}
    if(in[R]<=value){return R+1;}
    if(in[L]<=value){return R;}
    return L;;
}

/**
  * helper routine: check if array is sorted correctly
  */
bool isSorted(int ref[], int data[], const size_t size){
    std::sort(ref, ref + size);
    for (size_t idx = 0; idx < size; ++idx){
        if (ref[idx] != data[idx]) {
            return false;
        }
    }
    return true;
}

/**
  * sequential merge step (straight-forward implementation)
  */
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {

    if(begin1==end2){out[begin1]=in[begin1];}

    else{
    long left = begin1;
    long right = begin2;
    long idx = outBegin;
    int array1[end1-begin1];
    int array2[end2-begin2];
    int merged[end2-begin1];
    for(int i=0;i<(end1-begin1);i++){
        array1[i]=in[begin1+i];
    }
    for(int i=0;i<(end2-begin2);i++){
        array2[i]=in[begin2+i];
    }

    for(int i=0;i<(end2-begin2);i++){
        merged[i+binarysearchfindlowerrank(array1,(end1-begin1),array2[i])]=array2[i];
    }
    for(int i=0;i<(end1-begin1);i++){
        merged[i+binarysearchfinduperrank(array2,(end2-begin2),array1[i])]=array1[i];
    }
    for(int i=0;i<(end2-begin1);i++){
        out[begin1+i]=merged[i];
    }
    }
}
bool myfunc (long i , long j){return (i<j);}
/**
  * sequential MergeSort
  */
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
  if (begin < (end - 1)) {
           long half =(begin+end) / 2;



         {


             MsSequential(array, tmp, !inplace, begin, half);

             MsSequential(array, tmp, !inplace, half, end);
              }
 if (inplace){
            MsMergeSequential(array, tmp, begin, half, half, end, begin);
 } else {
            MsMergeSequential(tmp, array, begin, half, half, end, begin);
 }

    } else if (!inplace) {

        tmp[begin] = array[begin];
    }
}

/**
  * Serial MergeSort
  */
void MsSerial(int *array, int *tmp, const size_t size) {

    MsSequential(array, tmp, true, 0, size);
}

/**

/**
  * @brief program entry point
  */
int main(int argc, char* argv[]) {
    // variables to measure the elapsed time
    struct timeval t1, t2;
    double etime;

    // expect one command line arguments: array size
    if (argc != 2) {
        printf("Usage: MergeSort.exe <array size> \n");
        printf("\n");
        return EXIT_FAILURE;
    }
    else {
        const size_t stSize = strtol(argv[1], NULL, 10);
        int *data = (int*) malloc(stSize * sizeof(int));
        int *tmp = (int*) malloc(stSize * sizeof(int));
        int *ref = (int*) malloc(stSize * sizeof(int));
        printf("Initialization...\n");

        srand(95);


        for (size_t idx = 0; idx < stSize; ++idx){
            data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
        }
        std::copy(data, data + stSize, ref);

        double dSize = (stSize * sizeof(int)) / 1024 / 1024;
        printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);

        gettimeofday(&t1, NULL);

        {

        {
        MsSerial(data, tmp, stSize);
        }
        }
        gettimeofday(&t2, NULL);
        etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
        etime = etime / 1000;

        printf("done, took %f sec. Verification...", etime);
        if (isSorted(ref, data, stSize)) {
            printf(" successful.\n");
        }
        else {
            printf(" FAILED.\n");
        }

        free(data);
        //delete[] data;
        free(tmp);
        //delete[] tmp;
        free(ref);
        //delete[] ref;
    }

    return EXIT_SUCCESS;
}

控制台输出:

Initialization...
Sorting 100 elements of type int (0.000000 MiB)...
done, took 0.000000 sec. Verification... successful.

并行合并的讲义似乎缺少伪代码行。这是我对并行化 for 循环的更完整示例的猜测:

for each a ∈ A do
    Do a binary search to find where a would be added into B
    C[rankA(a) + rankB(a)] = a
end
for each b ∈ B do
    Do a binary search to find where a would be added into B
    C[rankB(b) + rankA(b)] = b
end

这种方法存在一些明显的问题。假设有 4 个处理器,初始数组足够大,每次二分搜索的平均探测数为 10。在这种情况下,二分搜索方法较慢,尽管并行化。由于搜索是并行进行的,因此搜索实例的顺序是未知的,因此每个搜索实例都在整个 B[] 中搜索 RankB(a) 或在所有 A[] 中搜索 RankA(b)。对 C[] 的并行写入可能会相互干扰,因为 C[] 对所有处理器都是通用的。

n 个处理器的一种更常见的方法:将数组拆分为 n 个部分,对 n 个部分中的每个部分并行合并排序以创建 n 个排序运行。对 n 次排序运行的偶数和奇数对进行 n/2 并行合并,以创建两倍大小的合并运行 n/2。对 n/2 合并运行的偶数和奇数对进行 n/4 并行合并以创建 n/4 合并运行...。对两个排序运行进行最终合并以创建最终排序输出。


排名的非二进制搜索解释:

rankA(a) is the index of a in A[]

rankB(b) is the index of b in B[]

// find rankB(a)
for(i = 0; i < size(B[]); i++)
    if(a <= B[i])break;          // <= for rankB(a)
rankB(a) = i;

// find rankA(b)
for(i = 0; i < size(A[]); i++)
    if(b < A[i])break;           // <  for rankA(b)
rankA(b) = i;

So we have to assign a 1 for the smallest element and a 12 to the biggest element, but how do we do that? It seems to be inpossible, because if we want to get 1 from our addition then one of the function has to yield 0, this other function therefore can have a maximum value of 5.

不,你不知道。您想将 0 分配给最小的元素,将 11 分配给最大的元素。

在您的示例中,排名函数的结果范围是 [0, 6],而不是 [0, 5]。当您尝试放置一个大于您正在探测的所有元素的值时,它的正确索引是数组末尾之后的一个。您不访问输入中的那个位置。

使用 C++ <algorithm> header 中的函数表达,尽管您可能应该 lower_bound 自己编写

size_t rank(ForwardIterator first, ForwardIterator last, Value value) {
    auto probe = std::lower_bound(first, last, value);
    return std::distance(first, probe);
}

void merge(RandomAccessIterator1 first1, RandomAccessIterator1 last1, RandomAccessIterator2 first2, RandomAccessIterator2 last2, OutputIterator d_first) {
    for (size_t i = 0; i < std::distance(first1, last1); ++i) {
        auto value = first1[i];
        auto j = rank(first2, last2, value);
        d_first[i + j] = value;
    }
    for (size_t i = 0; i < std::distance(first2, last2); ++i) {
        auto value = first2[i];
        auto j = rank(first1, last1, value);
        d_first[i + j] = value;
    }
}

请注意,这不能正确处理等效元素。为此,您必须决定如何打破平局。一个简单的选择是说 A 的等效元素在 B 的元素之前。为了解决这个问题,我们可以将 rank 拆分为两个

size_t rank_A(ForwardIterator first, ForwardIterator last, Value value) {
    auto probe = std::lower_bound(first, last, value); // find the first element equal or greater
    return std::distance(first, probe);
}

size_t rank_B(ForwardIterator first, ForwardIterator last, Value value) {
    auto probe = std::upper_bound(first, last, value); // find the first element strictly greater
    return std::distance(first, probe);
}

void merge(RandomAccessIterator1 first1, RandomAccessIterator1 last1, RandomAccessIterator2 first2, RandomAccessIterator2 last2, OutputIterator d_first) {
    for (size_t i = 0; i < std::distance(first1, last1); ++i) {
        auto value = first1[i];
        auto j = rank_A(first2, last2, value);
        d_first[i + j] = value;
    }
    for (size_t i = 0; i < std::distance(first2, last2); ++i) {
        auto value = first2[i];
        auto j = rank_B(first1, last1, value);
        d_first[i + j] = value;
    }
}

这仍然可以像以前一样并行化。