线程 radixsort 不快
Threaded radixsort not faster
我无法理解为什么我的实现没有单线程快。
我正在尝试使用一定数量 (n) 的线程实现按位 MSB 基数排序(目前正在工作)。
在我的方法中,我创建了一个新线程,直到递归中的 log2(n)。但这似乎只是增加了创建线程的时间原因。
我猜创建的线程不能同时在数组上工作,即使其中有责任分离。
如果有人能解释一下,我将不胜感激。谢谢!!
更新::
事实证明,我错过了 Ulrich 评论的最明显的解决方案。我正在使用一个 VM,它被锁定到一个 CPU,我不知道它确实如此。更改后,我得到了所需的基数排序加速。
感谢大家对该主题的评论和一般信息。 ^^
最大线程数应限制为系统内核数,如果每个内核都具有超线程,则应限制为内核数的两倍。您已经更新了您的问题以注意您只有一个核心使用虚拟机,而多线程版本使用多个核心更快。
其他潜在问题:如果数据分布不均匀,则拆分为 bins 将导致 bins 大小不均匀。使用大于 2 的基数,例如 256,会稍微减少这种影响。在初始阶段,可能会出现缓存行冲突,每次写入一个核心上的缓存行都会使其他核心上该缓存行的所有实例无效。
示例代码使用单个线程执行 MSD 优先基数排序 base 256,以创建 256 个 bin,然后使用 4 个线程使用 LSD 优先基数排序对其进行排序。此示例使用 Windows 特定线程 API。这依赖于具有合理统一的数据,以便 256 个 bin 的大小大致相等。我没有尝试对用于创建 256 个 bin 的初始 MSD pass 使用多线程,因为一个潜在的问题是索引更新数组上的缓存行冲突,这需要原子增量操作,这可能会限制性能受益。
#include <iostream>
#include <windows.h>
#define COUNT (6000*6000) // number of values to sort
#define QP 1 // if != 0, use queryperformance for timer
#if QP
#include <math.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#else
#include <ctime>
#endif
#if QP
static LI64 liQPFrequency; // cpu counter values
static LI64 liStartTime;
static LI64 liStopTime;
static double dQPFrequency;
static double dStartTime;
static double dStopTime;
static double dElapsedTime;
#else
clock_t ctTimeStart; // clock values
clock_t ctTimeStop;
#endif
typedef unsigned int uint32_t;
typedef unsigned long long uint64_t;
static HANDLE hs0; // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1; // thread handles
static HANDLE ht2;
static HANDLE ht3;
static uint64_t *pa, *pb; // ptrs to arrays
static uint32_t aIndex[260]; // start + end indexes, 256 bins
static uint32_t aIi; // current index for 4 bins
static DWORD WINAPI Thread0(LPVOID lpvoid);
void RadixSort(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t i,m,n;
uint64_t u;
for(i = 0; i < count; i++) // generate histogram
aIndex[(size_t)(a[i] >> 56)]++;
m = 0; // convert to indexes
for(i = 0; i < 257; i++){ // including end of bin[255]
n = aIndex[i];
aIndex[i] = m;
m += n;
}
for(i = 0; i < count; i++){ // sort by msb
u = a[i];
m = (uint32_t)(u >> 56);
b[aIndex[m]++] = u;
}
for(i = 256; i; i--) // restore aIndex
aIndex[i] = aIndex[i-1];
aIndex[0] = 0;
for(aIi = 0; aIi < 256; aIi += 4){
ReleaseSemaphore(hs1, 1, NULL); // start threads
ReleaseSemaphore(hs2, 1, NULL);
ReleaseSemaphore(hs3, 1, NULL);
Thread0(NULL);
WaitForSingleObject(hs0, INFINITE); // wait for threads done
WaitForSingleObject(hs0, INFINITE);
WaitForSingleObject(hs0, INFINITE);
}
}
void RadixSort7(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t mIndex[7][256] = {0}; // count / index matrix
uint32_t i,j,m,n;
uint64_t u;
for(i = 0; i < count; i++){ // generate histograms
u = a[i];
for(j = 0; j < 7; j++){
mIndex[j][(size_t)(u & 0xff)]++;
u >>= 8;
}
}
for(j = 0; j < 7; j++){ // convert to indices
m = 0;
for(i = 0; i < 256; i++){
n = mIndex[j][i];
mIndex[j][i] = m;
m += n;
}
}
for(j = 0; j < 7; j++){ // radix sort
for(i = 0; i < count; i++){ // sort by current lsb
u = a[i];
m = (size_t)(u>>(j<<3))&0xff;
b[mIndex[j][m]++] = u;
}
std::swap(a, b); // swap ptrs
}
}
static DWORD WINAPI Thread0(LPVOID lpvoid)
{
RadixSort7(pb + aIndex[aIi+0], pa + aIndex[aIi+0], aIndex[aIi+1]-aIndex[aIi+0]);
return 0;
}
static DWORD WINAPI Thread1(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs1, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+1], pa + aIndex[aIi+1], aIndex[aIi+2]-aIndex[aIi+1]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
static DWORD WINAPI Thread2(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs2, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+2], pa + aIndex[aIi+2], aIndex[aIi+3]-aIndex[aIi+2]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
static DWORD WINAPI Thread3(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs3, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+3], pa + aIndex[aIi+3], aIndex[aIi+4]-aIndex[aIi+3]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
uint64_t rnd64() // random 64 bit integer
{
static uint64_t r = 1ull;
r = r * 6364136223846793005ull + 1442695040888963407ull;
return r;
}
int main( )
{
uint64_t * a = new uint64_t [COUNT]; // allocate data array
uint64_t * b = new uint64_t [COUNT]; // allocate temp array
size_t i;
pa = a; // set global ptrs
pb = b;
hs0 = CreateSemaphore(NULL,0,7,NULL); // create semaphores
hs1 = CreateSemaphore(NULL,0,1,NULL);
hs2 = CreateSemaphore(NULL,0,1,NULL);
hs3 = CreateSemaphore(NULL,0,1,NULL);
ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0); // create threads
ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
for(i = 0; i < COUNT; i++){ // generate data
a[i] = rnd64();
}
#if QP
QueryPerformanceFrequency(&liQPFrequency);
dQPFrequency = (double)liQPFrequency.QuadPart;
timeBeginPeriod(1); // set ticker to 1000 hz
Sleep(128); // wait for it to settle
QueryPerformanceCounter(&liStartTime);
#else
ctTimeStart = clock();
#endif
RadixSort(a, b, COUNT);
#if QP
QueryPerformanceCounter(&liStopTime);
dStartTime = (double)liStartTime.QuadPart;
dStopTime = (double)liStopTime.QuadPart;
dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
timeEndPeriod(1); // restore ticker to default
std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
ctTimeStop = clock();
std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif
WaitForSingleObject(ht1, INFINITE); // wait for threads
WaitForSingleObject(ht2, INFINITE);
WaitForSingleObject(ht3, INFINITE);
CloseHandle(ht1); // close threads
CloseHandle(ht2);
CloseHandle(ht3);
CloseHandle(hs0); // close semaphores
CloseHandle(hs1);
CloseHandle(hs2);
CloseHandle(hs3);
for(i = 1; i < COUNT; i++){
if(a[i] < a[i-1]){
break;}}
if(i == COUNT)
std::cout << "passed" << std::endl;
else
std::cout << "failed" << std::endl;
delete[] b;
delete[] a;
return 0;
}
我无法理解为什么我的实现没有单线程快。
我正在尝试使用一定数量 (n) 的线程实现按位 MSB 基数排序(目前正在工作)。 在我的方法中,我创建了一个新线程,直到递归中的 log2(n)。但这似乎只是增加了创建线程的时间原因。 我猜创建的线程不能同时在数组上工作,即使其中有责任分离。
如果有人能解释一下,我将不胜感激。谢谢!!
更新::
事实证明,我错过了 Ulrich 评论的最明显的解决方案。我正在使用一个 VM,它被锁定到一个 CPU,我不知道它确实如此。更改后,我得到了所需的基数排序加速。
感谢大家对该主题的评论和一般信息。 ^^
最大线程数应限制为系统内核数,如果每个内核都具有超线程,则应限制为内核数的两倍。您已经更新了您的问题以注意您只有一个核心使用虚拟机,而多线程版本使用多个核心更快。
其他潜在问题:如果数据分布不均匀,则拆分为 bins 将导致 bins 大小不均匀。使用大于 2 的基数,例如 256,会稍微减少这种影响。在初始阶段,可能会出现缓存行冲突,每次写入一个核心上的缓存行都会使其他核心上该缓存行的所有实例无效。
示例代码使用单个线程执行 MSD 优先基数排序 base 256,以创建 256 个 bin,然后使用 4 个线程使用 LSD 优先基数排序对其进行排序。此示例使用 Windows 特定线程 API。这依赖于具有合理统一的数据,以便 256 个 bin 的大小大致相等。我没有尝试对用于创建 256 个 bin 的初始 MSD pass 使用多线程,因为一个潜在的问题是索引更新数组上的缓存行冲突,这需要原子增量操作,这可能会限制性能受益。
#include <iostream>
#include <windows.h>
#define COUNT (6000*6000) // number of values to sort
#define QP 1 // if != 0, use queryperformance for timer
#if QP
#include <math.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#else
#include <ctime>
#endif
#if QP
static LI64 liQPFrequency; // cpu counter values
static LI64 liStartTime;
static LI64 liStopTime;
static double dQPFrequency;
static double dStartTime;
static double dStopTime;
static double dElapsedTime;
#else
clock_t ctTimeStart; // clock values
clock_t ctTimeStop;
#endif
typedef unsigned int uint32_t;
typedef unsigned long long uint64_t;
static HANDLE hs0; // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1; // thread handles
static HANDLE ht2;
static HANDLE ht3;
static uint64_t *pa, *pb; // ptrs to arrays
static uint32_t aIndex[260]; // start + end indexes, 256 bins
static uint32_t aIi; // current index for 4 bins
static DWORD WINAPI Thread0(LPVOID lpvoid);
void RadixSort(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t i,m,n;
uint64_t u;
for(i = 0; i < count; i++) // generate histogram
aIndex[(size_t)(a[i] >> 56)]++;
m = 0; // convert to indexes
for(i = 0; i < 257; i++){ // including end of bin[255]
n = aIndex[i];
aIndex[i] = m;
m += n;
}
for(i = 0; i < count; i++){ // sort by msb
u = a[i];
m = (uint32_t)(u >> 56);
b[aIndex[m]++] = u;
}
for(i = 256; i; i--) // restore aIndex
aIndex[i] = aIndex[i-1];
aIndex[0] = 0;
for(aIi = 0; aIi < 256; aIi += 4){
ReleaseSemaphore(hs1, 1, NULL); // start threads
ReleaseSemaphore(hs2, 1, NULL);
ReleaseSemaphore(hs3, 1, NULL);
Thread0(NULL);
WaitForSingleObject(hs0, INFINITE); // wait for threads done
WaitForSingleObject(hs0, INFINITE);
WaitForSingleObject(hs0, INFINITE);
}
}
void RadixSort7(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t mIndex[7][256] = {0}; // count / index matrix
uint32_t i,j,m,n;
uint64_t u;
for(i = 0; i < count; i++){ // generate histograms
u = a[i];
for(j = 0; j < 7; j++){
mIndex[j][(size_t)(u & 0xff)]++;
u >>= 8;
}
}
for(j = 0; j < 7; j++){ // convert to indices
m = 0;
for(i = 0; i < 256; i++){
n = mIndex[j][i];
mIndex[j][i] = m;
m += n;
}
}
for(j = 0; j < 7; j++){ // radix sort
for(i = 0; i < count; i++){ // sort by current lsb
u = a[i];
m = (size_t)(u>>(j<<3))&0xff;
b[mIndex[j][m]++] = u;
}
std::swap(a, b); // swap ptrs
}
}
static DWORD WINAPI Thread0(LPVOID lpvoid)
{
RadixSort7(pb + aIndex[aIi+0], pa + aIndex[aIi+0], aIndex[aIi+1]-aIndex[aIi+0]);
return 0;
}
static DWORD WINAPI Thread1(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs1, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+1], pa + aIndex[aIi+1], aIndex[aIi+2]-aIndex[aIi+1]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
static DWORD WINAPI Thread2(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs2, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+2], pa + aIndex[aIi+2], aIndex[aIi+3]-aIndex[aIi+2]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
static DWORD WINAPI Thread3(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs3, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+3], pa + aIndex[aIi+3], aIndex[aIi+4]-aIndex[aIi+3]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
uint64_t rnd64() // random 64 bit integer
{
static uint64_t r = 1ull;
r = r * 6364136223846793005ull + 1442695040888963407ull;
return r;
}
int main( )
{
uint64_t * a = new uint64_t [COUNT]; // allocate data array
uint64_t * b = new uint64_t [COUNT]; // allocate temp array
size_t i;
pa = a; // set global ptrs
pb = b;
hs0 = CreateSemaphore(NULL,0,7,NULL); // create semaphores
hs1 = CreateSemaphore(NULL,0,1,NULL);
hs2 = CreateSemaphore(NULL,0,1,NULL);
hs3 = CreateSemaphore(NULL,0,1,NULL);
ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0); // create threads
ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
for(i = 0; i < COUNT; i++){ // generate data
a[i] = rnd64();
}
#if QP
QueryPerformanceFrequency(&liQPFrequency);
dQPFrequency = (double)liQPFrequency.QuadPart;
timeBeginPeriod(1); // set ticker to 1000 hz
Sleep(128); // wait for it to settle
QueryPerformanceCounter(&liStartTime);
#else
ctTimeStart = clock();
#endif
RadixSort(a, b, COUNT);
#if QP
QueryPerformanceCounter(&liStopTime);
dStartTime = (double)liStartTime.QuadPart;
dStopTime = (double)liStopTime.QuadPart;
dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
timeEndPeriod(1); // restore ticker to default
std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
ctTimeStop = clock();
std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif
WaitForSingleObject(ht1, INFINITE); // wait for threads
WaitForSingleObject(ht2, INFINITE);
WaitForSingleObject(ht3, INFINITE);
CloseHandle(ht1); // close threads
CloseHandle(ht2);
CloseHandle(ht3);
CloseHandle(hs0); // close semaphores
CloseHandle(hs1);
CloseHandle(hs2);
CloseHandle(hs3);
for(i = 1; i < COUNT; i++){
if(a[i] < a[i-1]){
break;}}
if(i == COUNT)
std::cout << "passed" << std::endl;
else
std::cout << "failed" << std::endl;
delete[] b;
delete[] a;
return 0;
}