合并排序算法中使用二分搜索的合并步骤
Merge step using binary search in merge sort algorithm
我们必须将串行代码转换为并行代码,根据我的研究,如果使用标准方法,则无法并行化合并步骤。必须使用本讲义中介绍的一种方法(算法 3)lecture notes。在尝试使用并行化之前,我尝试实现代码并查看它是否串行运行。
但是验证失败,即应该排序的数组没有排序。我的主要问题是在可能有多个元素的数组中找到一个元素的等级。该算法在讲义中有所描述,但我开始相信如果有多个相同数字的实例,它可能不起作用。
假设我们有两个数组
A=[0_1,0_2,2_1,5_1,5_2,8_1] and B=[0_3,2_2,2_3,5_3,6_1,6_2]
最后我们想要
C=[0_1,0_2,0_3,2_1,2_2,2_3,5_1,5_2,5_3,6_1,6_2,8_1]
我们想通过计算排名来做到这一点,
即
rank_C(x)=rank_B(x)+rank_A(x)
我知道结果rank_C(x)
但我不知道用二进制搜索在对数时间分别找到rank_A(x)和rank_B(x)的算法
我们应该有:
0=rank_C(0_1)=rank_A("")+rank_B("")
1=rank_C(0_2)=rank_A("")+rank_B("")
2=rank_C(0_3)=rank_A("")+rank_B("")
3=rank_C(2_1)=rank_A("")+rank_B("")
4=rank_C(2_2)=rank_A("")+rank_B("")
5=rank_C(2_3)=rank_A("")+rank_B("")
6=rank_C(5_1)=rank_A("")+rank_B("")
7=rank_C(5_2)=rank_A("")+rank_B("")
8=rank_C(5_3)=rank_A("")+rank_B("")
9=rank_C(6_1)=rank_A("")+rank_B("")
10=rank_C(6_2)=rank_A("")+rank_B("")
11=rank_C(8_6)=rank_A("")+rank_B("")
或者左侧递增 1 的问题的解决方案。
我什至不知道函数 rank_A(x) 的解应该是什么样子,因为如果列表 A 具有例如最小和最大元素怎么办?
如果我们要为最小的元素分配一个0,这意味着rankA(smallest)+rankB(smalles)=0+0。然而,这意味着在这种情况下我们的值范围被限制为 10。所以我们必须为最小的元素分配一个 1,为最大的元素分配一个 12,但是我们该怎么做呢?这似乎是不可能的,因为如果我们想从加法中得到 1,那么其中一个函数必须产生 0,因此另一个函数的最大值可以为 5。这意味着我们总共只能得到一个最大值11,但这不可能,因为我们想将 12 分配给我们最大的元素。
我还是试过了:
相关部分是合并步骤
旧代码 - 工作:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1
#endif
/**
* helper routine: check if array is sorted correctly
*/
bool isSorted(int ref[], int data[], const size_t size){
std::sort(ref, ref + size);
for (size_t idx = 0; idx < size; ++idx){
if (ref[idx] != data[idx]) {
return false;
}
}
return true;
}
/**
* sequential merge step (straight-forward implementation)
*/
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
long left = begin1;
long right = begin2;
long idx = outBegin;
while (left < end1 && right < end2) {
if (in[left] <= in[right]) {
out[idx] = in[left];
left++;
} else {
out[idx] = in[right];
right++;
}
idx++;
}
while (left < end1) {
out[idx] = in[left];
left++, idx++;
}
while (right < end2) {
out[idx] = in[right];
right++, idx++;
}
}
bool myfunc (long i , long j){return (i<j);}
/**
* sequential MergeSort
*/
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
if (begin < (end - 1)) {
long half =(begin+end) / 2;
{
MsSequential(array, tmp, !inplace, begin, half);
MsSequential(array, tmp, !inplace, half, end);
}
if (inplace){
MsMergeSequential(array, tmp, begin, half, half, end, begin);
} else {
MsMergeSequential(tmp, array, begin, half, half, end, begin);
}
} else if (!inplace) {
tmp[begin] = array[begin];
}
}
/**
* Serial MergeSort
*/
void MsSerial(int *array, int *tmp, const size_t size) {
MsSequential(array, tmp, true, 0, size);
}
/**
/**
* @brief program entry point
*/
int main(int argc, char* argv[]) {
// variables to measure the elapsed time
struct timeval t1, t2;
double etime;
// expect one command line arguments: array size
if (argc != 2) {
printf("Usage: MergeSort.exe <array size> \n");
printf("\n");
return EXIT_FAILURE;
}
else {
const size_t stSize = strtol(argv[1], NULL, 10);
int *data = (int*) malloc(stSize * sizeof(int));
int *tmp = (int*) malloc(stSize * sizeof(int));
int *ref = (int*) malloc(stSize * sizeof(int));
printf("Initialization...\n");
srand(95);
for (size_t idx = 0; idx < stSize; ++idx){
data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
}
std::copy(data, data + stSize, ref);
double dSize = (stSize * sizeof(int)) / 1024 / 1024;
printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);
gettimeofday(&t1, NULL);
{
{
MsSerial(data, tmp, stSize);
}
}
gettimeofday(&t2, NULL);
etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
etime = etime / 1000;
printf("done, took %f sec. Verification...", etime);
if (isSorted(ref, data, stSize)) {
printf(" successful.\n");
}
else {
printf(" FAILED.\n");
}
free(data);
//delete[] data;
free(tmp);
//delete[] tmp;
free(ref);
//delete[] ref;
}
return EXIT_SUCCESS;
}
新代码 - 不起作用
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1
#endif
int mybinarysearchleftmost(int *in,int n, int value){
int R=n;
int L=0;
while(R-L>1){
int middle = (R+L)/2;
if(in[middle]==value){while(in[middle]==value && middle > 0){middle=middle-1;}if(middle==0){return -1;} else {return (middle+1);}}
if(in[middle]<value){L=middle+1;}
if(in[middle]>value){R=middle-1;}
}
if(in[R]<value)
{
return R;}
else{
if(in[L]>= value && L==0){return -1;}
else return L;}
}
/**
* helper routine: check if array is sorted correctly
*/
bool isSorted(int ref[], int data[], const size_t size){
std::sort(ref, ref + size);
for (size_t idx = 0; idx < size; ++idx){
if (ref[idx] != data[idx]) {
return false;
}
}
return true;
}
/**
* sequential merge step (straight-forward implementation)
*/
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
int helperarray[(end2-begin1)];
for(int i=0;i<(end2-begin1);i++){
helperarray[i]=0;
}
int helperarray2[(end2-begin1)];
for(int i=0;i<(end2-begin1);i++){
helperarray2[i]=0;
}
int array1[end1-begin1];
for(int i=0;i<(end1-begin1);i++){
array1[i]=in[begin1+i];
}
printf("[");
for(int i=0;i<(end1-begin1);i++){printf("%u,",array1[i]);}
printf("]\n");
int array2[end2-begin2];
for(int i=0;i<(end1-begin1);i++){
array2[i]=in[begin2+i];
}
printf("[");
for(int i=0;i<(end2-begin2);i++){printf("%u,",array2[i]);}
printf("]\n");
for(int i = 0;i<(end2-begin2);i++){
helperarray[i+mybinarysearchleftmost(array1,(end2-begin2),array2[i])]=array2[i];
helperarray2[i+mybinarysearchleftmost(array1,(end2-begin2),array2[i])]=1;
}
int counter=0;
for(int i = 0;i<(end2-begin1);i++){
if(helperarray2[i]==0){helperarray[i]=array1[counter];counter++;}
}
for(int i=0;i<(end2-begin1);i++){
out[begin1+i]=helperarray[i];
}
}
bool myfunc (long i , long j){return (i<j);}
/**
* sequential MergeSort
*/
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
if (begin < (end - 1)) {
long half =(begin+end) / 2;
{
MsSequential(array, tmp, !inplace, begin, half);
MsSequential(array, tmp, !inplace, half, end);
}
if (inplace){
MsMergeSequential(array, tmp, begin, half, half, end, begin);
} else {
MsMergeSequential(tmp, array, begin, half, half, end, begin);
}
} else if (!inplace) {
tmp[begin] = array[begin];
}
}
/**
* Serial MergeSort
*/
void MsSerial(int *array, int *tmp, const size_t size) {
MsSequential(array, tmp, true, 0, size);
}
/**
/**
* @brief program entry point
*/
int main(int argc, char* argv[]) {
// variables to measure the elapsed time
struct timeval t1, t2;
double etime;
// expect one command line arguments: array size
if (argc != 2) {
printf("Usage: MergeSort.exe <array size> \n");
printf("\n");
return EXIT_FAILURE;
}
else {
const size_t stSize = strtol(argv[1], NULL, 10);
int *data = (int*) malloc(stSize * sizeof(int));
int *tmp = (int*) malloc(stSize * sizeof(int));
int *ref = (int*) malloc(stSize * sizeof(int));
printf("Initialization...\n");
srand(95);
for (size_t idx = 0; idx < stSize; ++idx){
data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
}
std::copy(data, data + stSize, ref);
double dSize = (stSize * sizeof(int)) / 1024 / 1024;
printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);
gettimeofday(&t1, NULL);
{
{
MsSerial(data, tmp, stSize);
}
}
gettimeofday(&t2, NULL);
etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
etime = etime / 1000;
printf("done, took %f sec. Verification...", etime);
if (isSorted(ref, data, stSize)) {
printf(" successful.\n");
}
else {
printf(" FAILED.\n");
}
free(data);
//delete[] data;
free(tmp);
//delete[] tmp;
free(ref);
//delete[] ref;
}
return EXIT_SUCCESS;
}
最后一个数组看起来像这样,mergestep 出错了:
[9,4,1,1,1,1,10967,1,10,11,1,1,10967,10967,1,17,10967,1,0,681153680,]
我终于在答案的帮助下解决了问题:现在这是新的工作代码:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1
#endif
//Takes a sorted list of size n and a value, puts the value in one of n+1 possible positions, if value is same to an element of the list take the position after the last occurence of the same element
int binarysearchfindlowerrank(int *in,int n,int value){
int L=0;
int R=n;
while(R-L>1){
int middle = (R+L)/2;
if(in[middle]==value){while(in[middle]==value&&middle>0){middle=middle-1;}return middle+1;}
if(in[middle]<value){L=middle;}
if(in[middle]>value){R=middle;}
}
if(L==0&&in[L]>value){return 0;}
if(R==n && in[R-1]< value){return n;}
if(R==n&& in[R-1]>=value){return R-1;}
if(in[R]<value){return R+1;}
if(in[L]<value){return R;}
return L;
}
//Takes a sorted list of size n and a value, puts the value in one of n+1 possible positions, if value is same to an element of the list take the position before the first occurence of the same element
int binarysearchfinduperrank(int *in,int n,int value){
int L=0;
int R=n;
while(R-L>1){
int middle = (R+L)/2;
if(in[middle]==value){
while(in[middle]==value&&middle<n)
{middle=middle+1;}
return middle;}
if(in[middle]<value){L=middle;}
if(in[middle]>value){R=middle;}
}
if(L==0&&in[L]>value){return 0;}
if(R==n && in[R-1]<= value){return n;}
if(R==n&& in[R-1]>value){return R-1;}
if(in[R]<=value){return R+1;}
if(in[L]<=value){return R;}
return L;;
}
/**
* helper routine: check if array is sorted correctly
*/
bool isSorted(int ref[], int data[], const size_t size){
std::sort(ref, ref + size);
for (size_t idx = 0; idx < size; ++idx){
if (ref[idx] != data[idx]) {
return false;
}
}
return true;
}
/**
* sequential merge step (straight-forward implementation)
*/
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
if(begin1==end2){out[begin1]=in[begin1];}
else{
long left = begin1;
long right = begin2;
long idx = outBegin;
int array1[end1-begin1];
int array2[end2-begin2];
int merged[end2-begin1];
for(int i=0;i<(end1-begin1);i++){
array1[i]=in[begin1+i];
}
for(int i=0;i<(end2-begin2);i++){
array2[i]=in[begin2+i];
}
for(int i=0;i<(end2-begin2);i++){
merged[i+binarysearchfindlowerrank(array1,(end1-begin1),array2[i])]=array2[i];
}
for(int i=0;i<(end1-begin1);i++){
merged[i+binarysearchfinduperrank(array2,(end2-begin2),array1[i])]=array1[i];
}
for(int i=0;i<(end2-begin1);i++){
out[begin1+i]=merged[i];
}
}
}
bool myfunc (long i , long j){return (i<j);}
/**
* sequential MergeSort
*/
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
if (begin < (end - 1)) {
long half =(begin+end) / 2;
{
MsSequential(array, tmp, !inplace, begin, half);
MsSequential(array, tmp, !inplace, half, end);
}
if (inplace){
MsMergeSequential(array, tmp, begin, half, half, end, begin);
} else {
MsMergeSequential(tmp, array, begin, half, half, end, begin);
}
} else if (!inplace) {
tmp[begin] = array[begin];
}
}
/**
* Serial MergeSort
*/
void MsSerial(int *array, int *tmp, const size_t size) {
MsSequential(array, tmp, true, 0, size);
}
/**
/**
* @brief program entry point
*/
int main(int argc, char* argv[]) {
// variables to measure the elapsed time
struct timeval t1, t2;
double etime;
// expect one command line arguments: array size
if (argc != 2) {
printf("Usage: MergeSort.exe <array size> \n");
printf("\n");
return EXIT_FAILURE;
}
else {
const size_t stSize = strtol(argv[1], NULL, 10);
int *data = (int*) malloc(stSize * sizeof(int));
int *tmp = (int*) malloc(stSize * sizeof(int));
int *ref = (int*) malloc(stSize * sizeof(int));
printf("Initialization...\n");
srand(95);
for (size_t idx = 0; idx < stSize; ++idx){
data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
}
std::copy(data, data + stSize, ref);
double dSize = (stSize * sizeof(int)) / 1024 / 1024;
printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);
gettimeofday(&t1, NULL);
{
{
MsSerial(data, tmp, stSize);
}
}
gettimeofday(&t2, NULL);
etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
etime = etime / 1000;
printf("done, took %f sec. Verification...", etime);
if (isSorted(ref, data, stSize)) {
printf(" successful.\n");
}
else {
printf(" FAILED.\n");
}
free(data);
//delete[] data;
free(tmp);
//delete[] tmp;
free(ref);
//delete[] ref;
}
return EXIT_SUCCESS;
}
控制台输出:
Initialization...
Sorting 100 elements of type int (0.000000 MiB)...
done, took 0.000000 sec. Verification... successful.
并行合并的讲义似乎缺少伪代码行。这是我对并行化 for 循环的更完整示例的猜测:
for each a ∈ A do
Do a binary search to find where a would be added into B
C[rankA(a) + rankB(a)] = a
end
for each b ∈ B do
Do a binary search to find where a would be added into B
C[rankB(b) + rankA(b)] = b
end
这种方法存在一些明显的问题。假设有 4 个处理器,初始数组足够大,每次二分搜索的平均探测数为 10。在这种情况下,二分搜索方法较慢,尽管并行化。由于搜索是并行进行的,因此搜索实例的顺序是未知的,因此每个搜索实例都在整个 B[] 中搜索 RankB(a) 或在所有 A[] 中搜索 RankA(b)。对 C[] 的并行写入可能会相互干扰,因为 C[] 对所有处理器都是通用的。
n 个处理器的一种更常见的方法:将数组拆分为 n 个部分,对 n 个部分中的每个部分并行合并排序以创建 n 个排序运行。对 n 次排序运行的偶数和奇数对进行 n/2 并行合并,以创建两倍大小的合并运行 n/2。对 n/2 合并运行的偶数和奇数对进行 n/4 并行合并以创建 n/4 合并运行...。对两个排序运行进行最终合并以创建最终排序输出。
排名的非二进制搜索解释:
rankA(a) is the index of a in A[]
rankB(b) is the index of b in B[]
// find rankB(a)
for(i = 0; i < size(B[]); i++)
if(a <= B[i])break; // <= for rankB(a)
rankB(a) = i;
// find rankA(b)
for(i = 0; i < size(A[]); i++)
if(b < A[i])break; // < for rankA(b)
rankA(b) = i;
So we have to assign a 1 for the smallest element and a 12 to the biggest element, but how do we do that? It seems to be inpossible, because if we want to get 1 from our addition then one of the function has to yield 0, this other function therefore can have a maximum value of 5.
不,你不知道。您想将 0 分配给最小的元素,将 11 分配给最大的元素。
在您的示例中,排名函数的结果范围是 [0, 6],而不是 [0, 5]。当您尝试放置一个大于您正在探测的所有元素的值时,它的正确索引是数组末尾之后的一个。您不访问输入中的那个位置。
使用 C++ <algorithm>
header 中的函数表达,尽管您可能应该 lower_bound
自己编写
size_t rank(ForwardIterator first, ForwardIterator last, Value value) {
auto probe = std::lower_bound(first, last, value);
return std::distance(first, probe);
}
void merge(RandomAccessIterator1 first1, RandomAccessIterator1 last1, RandomAccessIterator2 first2, RandomAccessIterator2 last2, OutputIterator d_first) {
for (size_t i = 0; i < std::distance(first1, last1); ++i) {
auto value = first1[i];
auto j = rank(first2, last2, value);
d_first[i + j] = value;
}
for (size_t i = 0; i < std::distance(first2, last2); ++i) {
auto value = first2[i];
auto j = rank(first1, last1, value);
d_first[i + j] = value;
}
}
请注意,这不能正确处理等效元素。为此,您必须决定如何打破平局。一个简单的选择是说 A 的等效元素在 B 的元素之前。为了解决这个问题,我们可以将 rank
拆分为两个
size_t rank_A(ForwardIterator first, ForwardIterator last, Value value) {
auto probe = std::lower_bound(first, last, value); // find the first element equal or greater
return std::distance(first, probe);
}
size_t rank_B(ForwardIterator first, ForwardIterator last, Value value) {
auto probe = std::upper_bound(first, last, value); // find the first element strictly greater
return std::distance(first, probe);
}
void merge(RandomAccessIterator1 first1, RandomAccessIterator1 last1, RandomAccessIterator2 first2, RandomAccessIterator2 last2, OutputIterator d_first) {
for (size_t i = 0; i < std::distance(first1, last1); ++i) {
auto value = first1[i];
auto j = rank_A(first2, last2, value);
d_first[i + j] = value;
}
for (size_t i = 0; i < std::distance(first2, last2); ++i) {
auto value = first2[i];
auto j = rank_B(first1, last1, value);
d_first[i + j] = value;
}
}
这仍然可以像以前一样并行化。
我们必须将串行代码转换为并行代码,根据我的研究,如果使用标准方法,则无法并行化合并步骤。必须使用本讲义中介绍的一种方法(算法 3)lecture notes。在尝试使用并行化之前,我尝试实现代码并查看它是否串行运行。
但是验证失败,即应该排序的数组没有排序。我的主要问题是在可能有多个元素的数组中找到一个元素的等级。该算法在讲义中有所描述,但我开始相信如果有多个相同数字的实例,它可能不起作用。
假设我们有两个数组
A=[0_1,0_2,2_1,5_1,5_2,8_1] and B=[0_3,2_2,2_3,5_3,6_1,6_2]
最后我们想要
C=[0_1,0_2,0_3,2_1,2_2,2_3,5_1,5_2,5_3,6_1,6_2,8_1]
我们想通过计算排名来做到这一点,
即
rank_C(x)=rank_B(x)+rank_A(x)
我知道结果rank_C(x)
但我不知道用二进制搜索在对数时间分别找到rank_A(x)和rank_B(x)的算法
我们应该有:
0=rank_C(0_1)=rank_A("")+rank_B("")
1=rank_C(0_2)=rank_A("")+rank_B("")
2=rank_C(0_3)=rank_A("")+rank_B("")
3=rank_C(2_1)=rank_A("")+rank_B("")
4=rank_C(2_2)=rank_A("")+rank_B("")
5=rank_C(2_3)=rank_A("")+rank_B("")
6=rank_C(5_1)=rank_A("")+rank_B("")
7=rank_C(5_2)=rank_A("")+rank_B("")
8=rank_C(5_3)=rank_A("")+rank_B("")
9=rank_C(6_1)=rank_A("")+rank_B("")
10=rank_C(6_2)=rank_A("")+rank_B("")
11=rank_C(8_6)=rank_A("")+rank_B("")
或者左侧递增 1 的问题的解决方案。
我什至不知道函数 rank_A(x) 的解应该是什么样子,因为如果列表 A 具有例如最小和最大元素怎么办?
如果我们要为最小的元素分配一个0,这意味着rankA(smallest)+rankB(smalles)=0+0。然而,这意味着在这种情况下我们的值范围被限制为 10。所以我们必须为最小的元素分配一个 1,为最大的元素分配一个 12,但是我们该怎么做呢?这似乎是不可能的,因为如果我们想从加法中得到 1,那么其中一个函数必须产生 0,因此另一个函数的最大值可以为 5。这意味着我们总共只能得到一个最大值11,但这不可能,因为我们想将 12 分配给我们最大的元素。
我还是试过了:
相关部分是合并步骤
旧代码 - 工作:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1
#endif
/**
* helper routine: check if array is sorted correctly
*/
bool isSorted(int ref[], int data[], const size_t size){
std::sort(ref, ref + size);
for (size_t idx = 0; idx < size; ++idx){
if (ref[idx] != data[idx]) {
return false;
}
}
return true;
}
/**
* sequential merge step (straight-forward implementation)
*/
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
long left = begin1;
long right = begin2;
long idx = outBegin;
while (left < end1 && right < end2) {
if (in[left] <= in[right]) {
out[idx] = in[left];
left++;
} else {
out[idx] = in[right];
right++;
}
idx++;
}
while (left < end1) {
out[idx] = in[left];
left++, idx++;
}
while (right < end2) {
out[idx] = in[right];
right++, idx++;
}
}
bool myfunc (long i , long j){return (i<j);}
/**
* sequential MergeSort
*/
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
if (begin < (end - 1)) {
long half =(begin+end) / 2;
{
MsSequential(array, tmp, !inplace, begin, half);
MsSequential(array, tmp, !inplace, half, end);
}
if (inplace){
MsMergeSequential(array, tmp, begin, half, half, end, begin);
} else {
MsMergeSequential(tmp, array, begin, half, half, end, begin);
}
} else if (!inplace) {
tmp[begin] = array[begin];
}
}
/**
* Serial MergeSort
*/
void MsSerial(int *array, int *tmp, const size_t size) {
MsSequential(array, tmp, true, 0, size);
}
/**
/**
* @brief program entry point
*/
int main(int argc, char* argv[]) {
// variables to measure the elapsed time
struct timeval t1, t2;
double etime;
// expect one command line arguments: array size
if (argc != 2) {
printf("Usage: MergeSort.exe <array size> \n");
printf("\n");
return EXIT_FAILURE;
}
else {
const size_t stSize = strtol(argv[1], NULL, 10);
int *data = (int*) malloc(stSize * sizeof(int));
int *tmp = (int*) malloc(stSize * sizeof(int));
int *ref = (int*) malloc(stSize * sizeof(int));
printf("Initialization...\n");
srand(95);
for (size_t idx = 0; idx < stSize; ++idx){
data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
}
std::copy(data, data + stSize, ref);
double dSize = (stSize * sizeof(int)) / 1024 / 1024;
printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);
gettimeofday(&t1, NULL);
{
{
MsSerial(data, tmp, stSize);
}
}
gettimeofday(&t2, NULL);
etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
etime = etime / 1000;
printf("done, took %f sec. Verification...", etime);
if (isSorted(ref, data, stSize)) {
printf(" successful.\n");
}
else {
printf(" FAILED.\n");
}
free(data);
//delete[] data;
free(tmp);
//delete[] tmp;
free(ref);
//delete[] ref;
}
return EXIT_SUCCESS;
}
新代码 - 不起作用
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1
#endif
int mybinarysearchleftmost(int *in,int n, int value){
int R=n;
int L=0;
while(R-L>1){
int middle = (R+L)/2;
if(in[middle]==value){while(in[middle]==value && middle > 0){middle=middle-1;}if(middle==0){return -1;} else {return (middle+1);}}
if(in[middle]<value){L=middle+1;}
if(in[middle]>value){R=middle-1;}
}
if(in[R]<value)
{
return R;}
else{
if(in[L]>= value && L==0){return -1;}
else return L;}
}
/**
* helper routine: check if array is sorted correctly
*/
bool isSorted(int ref[], int data[], const size_t size){
std::sort(ref, ref + size);
for (size_t idx = 0; idx < size; ++idx){
if (ref[idx] != data[idx]) {
return false;
}
}
return true;
}
/**
* sequential merge step (straight-forward implementation)
*/
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
int helperarray[(end2-begin1)];
for(int i=0;i<(end2-begin1);i++){
helperarray[i]=0;
}
int helperarray2[(end2-begin1)];
for(int i=0;i<(end2-begin1);i++){
helperarray2[i]=0;
}
int array1[end1-begin1];
for(int i=0;i<(end1-begin1);i++){
array1[i]=in[begin1+i];
}
printf("[");
for(int i=0;i<(end1-begin1);i++){printf("%u,",array1[i]);}
printf("]\n");
int array2[end2-begin2];
for(int i=0;i<(end1-begin1);i++){
array2[i]=in[begin2+i];
}
printf("[");
for(int i=0;i<(end2-begin2);i++){printf("%u,",array2[i]);}
printf("]\n");
for(int i = 0;i<(end2-begin2);i++){
helperarray[i+mybinarysearchleftmost(array1,(end2-begin2),array2[i])]=array2[i];
helperarray2[i+mybinarysearchleftmost(array1,(end2-begin2),array2[i])]=1;
}
int counter=0;
for(int i = 0;i<(end2-begin1);i++){
if(helperarray2[i]==0){helperarray[i]=array1[counter];counter++;}
}
for(int i=0;i<(end2-begin1);i++){
out[begin1+i]=helperarray[i];
}
}
bool myfunc (long i , long j){return (i<j);}
/**
* sequential MergeSort
*/
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
if (begin < (end - 1)) {
long half =(begin+end) / 2;
{
MsSequential(array, tmp, !inplace, begin, half);
MsSequential(array, tmp, !inplace, half, end);
}
if (inplace){
MsMergeSequential(array, tmp, begin, half, half, end, begin);
} else {
MsMergeSequential(tmp, array, begin, half, half, end, begin);
}
} else if (!inplace) {
tmp[begin] = array[begin];
}
}
/**
* Serial MergeSort
*/
void MsSerial(int *array, int *tmp, const size_t size) {
MsSequential(array, tmp, true, 0, size);
}
/**
/**
* @brief program entry point
*/
int main(int argc, char* argv[]) {
// variables to measure the elapsed time
struct timeval t1, t2;
double etime;
// expect one command line arguments: array size
if (argc != 2) {
printf("Usage: MergeSort.exe <array size> \n");
printf("\n");
return EXIT_FAILURE;
}
else {
const size_t stSize = strtol(argv[1], NULL, 10);
int *data = (int*) malloc(stSize * sizeof(int));
int *tmp = (int*) malloc(stSize * sizeof(int));
int *ref = (int*) malloc(stSize * sizeof(int));
printf("Initialization...\n");
srand(95);
for (size_t idx = 0; idx < stSize; ++idx){
data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
}
std::copy(data, data + stSize, ref);
double dSize = (stSize * sizeof(int)) / 1024 / 1024;
printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);
gettimeofday(&t1, NULL);
{
{
MsSerial(data, tmp, stSize);
}
}
gettimeofday(&t2, NULL);
etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
etime = etime / 1000;
printf("done, took %f sec. Verification...", etime);
if (isSorted(ref, data, stSize)) {
printf(" successful.\n");
}
else {
printf(" FAILED.\n");
}
free(data);
//delete[] data;
free(tmp);
//delete[] tmp;
free(ref);
//delete[] ref;
}
return EXIT_SUCCESS;
}
最后一个数组看起来像这样,mergestep 出错了:
[9,4,1,1,1,1,10967,1,10,11,1,1,10967,10967,1,17,10967,1,0,681153680,]
我终于在答案的帮助下解决了问题:现在这是新的工作代码:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cstdio>
#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1
#endif
//Takes a sorted list of size n and a value, puts the value in one of n+1 possible positions, if value is same to an element of the list take the position after the last occurence of the same element
int binarysearchfindlowerrank(int *in,int n,int value){
int L=0;
int R=n;
while(R-L>1){
int middle = (R+L)/2;
if(in[middle]==value){while(in[middle]==value&&middle>0){middle=middle-1;}return middle+1;}
if(in[middle]<value){L=middle;}
if(in[middle]>value){R=middle;}
}
if(L==0&&in[L]>value){return 0;}
if(R==n && in[R-1]< value){return n;}
if(R==n&& in[R-1]>=value){return R-1;}
if(in[R]<value){return R+1;}
if(in[L]<value){return R;}
return L;
}
//Takes a sorted list of size n and a value, puts the value in one of n+1 possible positions, if value is same to an element of the list take the position before the first occurence of the same element
int binarysearchfinduperrank(int *in,int n,int value){
int L=0;
int R=n;
while(R-L>1){
int middle = (R+L)/2;
if(in[middle]==value){
while(in[middle]==value&&middle<n)
{middle=middle+1;}
return middle;}
if(in[middle]<value){L=middle;}
if(in[middle]>value){R=middle;}
}
if(L==0&&in[L]>value){return 0;}
if(R==n && in[R-1]<= value){return n;}
if(R==n&& in[R-1]>value){return R-1;}
if(in[R]<=value){return R+1;}
if(in[L]<=value){return R;}
return L;;
}
/**
* helper routine: check if array is sorted correctly
*/
bool isSorted(int ref[], int data[], const size_t size){
std::sort(ref, ref + size);
for (size_t idx = 0; idx < size; ++idx){
if (ref[idx] != data[idx]) {
return false;
}
}
return true;
}
/**
* sequential merge step (straight-forward implementation)
*/
void MsMergeSequential(int *out, int *in, long begin1, long end1, long begin2, long end2, long outBegin) {
if(begin1==end2){out[begin1]=in[begin1];}
else{
long left = begin1;
long right = begin2;
long idx = outBegin;
int array1[end1-begin1];
int array2[end2-begin2];
int merged[end2-begin1];
for(int i=0;i<(end1-begin1);i++){
array1[i]=in[begin1+i];
}
for(int i=0;i<(end2-begin2);i++){
array2[i]=in[begin2+i];
}
for(int i=0;i<(end2-begin2);i++){
merged[i+binarysearchfindlowerrank(array1,(end1-begin1),array2[i])]=array2[i];
}
for(int i=0;i<(end1-begin1);i++){
merged[i+binarysearchfinduperrank(array2,(end2-begin2),array1[i])]=array1[i];
}
for(int i=0;i<(end2-begin1);i++){
out[begin1+i]=merged[i];
}
}
}
bool myfunc (long i , long j){return (i<j);}
/**
* sequential MergeSort
*/
void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
if (begin < (end - 1)) {
long half =(begin+end) / 2;
{
MsSequential(array, tmp, !inplace, begin, half);
MsSequential(array, tmp, !inplace, half, end);
}
if (inplace){
MsMergeSequential(array, tmp, begin, half, half, end, begin);
} else {
MsMergeSequential(tmp, array, begin, half, half, end, begin);
}
} else if (!inplace) {
tmp[begin] = array[begin];
}
}
/**
* Serial MergeSort
*/
void MsSerial(int *array, int *tmp, const size_t size) {
MsSequential(array, tmp, true, 0, size);
}
/**
/**
* @brief program entry point
*/
int main(int argc, char* argv[]) {
// variables to measure the elapsed time
struct timeval t1, t2;
double etime;
// expect one command line arguments: array size
if (argc != 2) {
printf("Usage: MergeSort.exe <array size> \n");
printf("\n");
return EXIT_FAILURE;
}
else {
const size_t stSize = strtol(argv[1], NULL, 10);
int *data = (int*) malloc(stSize * sizeof(int));
int *tmp = (int*) malloc(stSize * sizeof(int));
int *ref = (int*) malloc(stSize * sizeof(int));
printf("Initialization...\n");
srand(95);
for (size_t idx = 0; idx < stSize; ++idx){
data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
}
std::copy(data, data + stSize, ref);
double dSize = (stSize * sizeof(int)) / 1024 / 1024;
printf("Sorting %zu elements of type int (%f MiB)...\n", stSize, dSize);
gettimeofday(&t1, NULL);
{
{
MsSerial(data, tmp, stSize);
}
}
gettimeofday(&t2, NULL);
etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
etime = etime / 1000;
printf("done, took %f sec. Verification...", etime);
if (isSorted(ref, data, stSize)) {
printf(" successful.\n");
}
else {
printf(" FAILED.\n");
}
free(data);
//delete[] data;
free(tmp);
//delete[] tmp;
free(ref);
//delete[] ref;
}
return EXIT_SUCCESS;
}
控制台输出:
Initialization...
Sorting 100 elements of type int (0.000000 MiB)...
done, took 0.000000 sec. Verification... successful.
并行合并的讲义似乎缺少伪代码行。这是我对并行化 for 循环的更完整示例的猜测:
for each a ∈ A do
Do a binary search to find where a would be added into B
C[rankA(a) + rankB(a)] = a
end
for each b ∈ B do
Do a binary search to find where a would be added into B
C[rankB(b) + rankA(b)] = b
end
这种方法存在一些明显的问题。假设有 4 个处理器,初始数组足够大,每次二分搜索的平均探测数为 10。在这种情况下,二分搜索方法较慢,尽管并行化。由于搜索是并行进行的,因此搜索实例的顺序是未知的,因此每个搜索实例都在整个 B[] 中搜索 RankB(a) 或在所有 A[] 中搜索 RankA(b)。对 C[] 的并行写入可能会相互干扰,因为 C[] 对所有处理器都是通用的。
n 个处理器的一种更常见的方法:将数组拆分为 n 个部分,对 n 个部分中的每个部分并行合并排序以创建 n 个排序运行。对 n 次排序运行的偶数和奇数对进行 n/2 并行合并,以创建两倍大小的合并运行 n/2。对 n/2 合并运行的偶数和奇数对进行 n/4 并行合并以创建 n/4 合并运行...。对两个排序运行进行最终合并以创建最终排序输出。
排名的非二进制搜索解释:
rankA(a) is the index of a in A[]
rankB(b) is the index of b in B[]
// find rankB(a)
for(i = 0; i < size(B[]); i++)
if(a <= B[i])break; // <= for rankB(a)
rankB(a) = i;
// find rankA(b)
for(i = 0; i < size(A[]); i++)
if(b < A[i])break; // < for rankA(b)
rankA(b) = i;
So we have to assign a 1 for the smallest element and a 12 to the biggest element, but how do we do that? It seems to be inpossible, because if we want to get 1 from our addition then one of the function has to yield 0, this other function therefore can have a maximum value of 5.
不,你不知道。您想将 0 分配给最小的元素,将 11 分配给最大的元素。
在您的示例中,排名函数的结果范围是 [0, 6],而不是 [0, 5]。当您尝试放置一个大于您正在探测的所有元素的值时,它的正确索引是数组末尾之后的一个。您不访问输入中的那个位置。
使用 C++ <algorithm>
header 中的函数表达,尽管您可能应该 lower_bound
自己编写
size_t rank(ForwardIterator first, ForwardIterator last, Value value) {
auto probe = std::lower_bound(first, last, value);
return std::distance(first, probe);
}
void merge(RandomAccessIterator1 first1, RandomAccessIterator1 last1, RandomAccessIterator2 first2, RandomAccessIterator2 last2, OutputIterator d_first) {
for (size_t i = 0; i < std::distance(first1, last1); ++i) {
auto value = first1[i];
auto j = rank(first2, last2, value);
d_first[i + j] = value;
}
for (size_t i = 0; i < std::distance(first2, last2); ++i) {
auto value = first2[i];
auto j = rank(first1, last1, value);
d_first[i + j] = value;
}
}
请注意,这不能正确处理等效元素。为此,您必须决定如何打破平局。一个简单的选择是说 A 的等效元素在 B 的元素之前。为了解决这个问题,我们可以将 rank
拆分为两个
size_t rank_A(ForwardIterator first, ForwardIterator last, Value value) {
auto probe = std::lower_bound(first, last, value); // find the first element equal or greater
return std::distance(first, probe);
}
size_t rank_B(ForwardIterator first, ForwardIterator last, Value value) {
auto probe = std::upper_bound(first, last, value); // find the first element strictly greater
return std::distance(first, probe);
}
void merge(RandomAccessIterator1 first1, RandomAccessIterator1 last1, RandomAccessIterator2 first2, RandomAccessIterator2 last2, OutputIterator d_first) {
for (size_t i = 0; i < std::distance(first1, last1); ++i) {
auto value = first1[i];
auto j = rank_A(first2, last2, value);
d_first[i + j] = value;
}
for (size_t i = 0; i < std::distance(first2, last2); ++i) {
auto value = first2[i];
auto j = rank_B(first1, last1, value);
d_first[i + j] = value;
}
}
这仍然可以像以前一样并行化。