线程 radixsort 不快

Threaded radixsort not faster

我无法理解为什么我的实现没有单线程快。

我正在尝试使用一定数量 (n) 的线程实现按位 MSB 基数排序(目前正在工作)。 在我的方法中,我创建了一个新线程,直到递归中的 log2(n)。但这似乎只是增加了创建线程的时间原因。 我猜创建的线程不能同时在数组上工作,即使其中有责任分离。

如果有人能解释一下,我将不胜感激。谢谢!!

更新::

事实证明,我错过了 Ulrich 评论的最明显的解决方案。我正在使用一个 VM,它被锁定到一个 CPU,我不知道它确实如此。更改后,我得到了所需的基数排序加速。

感谢大家对该主题的评论和一般信息。 ^^

最大线程数应限制为系统内核数,如果每个内核都具有超线程,则应限制为内核数的两倍。您已经更新了您的问题以注意您只有一个核心使用虚拟机,而多线程版本使用多个核心更快。

其他潜在问题:如果数据分布不均匀,则拆分为 bins 将导致 bins 大小不均匀。使用大于 2 的基数,例如 256,会稍微减少这种影响。在初始阶段,可能会出现缓存行冲突,每次写入一个核心上的缓存行都会使其他核心上该缓存行的所有实例无效。

示例代码使用单个线程执行 MSD 优先基数排序 base 256,以创建 256 个 bin,然后使用 4 个线程使用 LSD 优先基数排序对其进行排序。此示例使用 Windows 特定线程 API。这依赖于具有合理统一的数据,以便 256 个 bin 的大小大致相等。我没有尝试对用于创建 256 个 bin 的初始 MSD pass 使用多线程,因为一个潜在的问题是索引更新数组上的缓存行冲突,这需要原子增量操作,这可能会限制性能受益。

#include <iostream>
#include <windows.h>

#define COUNT (6000*6000)            // number of values to sort

#define QP 1        // if != 0, use queryperformance for timer

#if QP
#include <math.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#else
#include <ctime>
#endif

#if QP
static LI64     liQPFrequency;  // cpu counter values
static LI64     liStartTime;
static LI64     liStopTime;
static double   dQPFrequency;
static double   dStartTime;
static double   dStopTime;
static double   dElapsedTime;
#else
clock_t ctTimeStart;            // clock values
clock_t ctTimeStop;
#endif

typedef unsigned       int uint32_t;
typedef unsigned long long uint64_t;

static HANDLE hs0;                      // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;

static HANDLE ht1;                      // thread handles
static HANDLE ht2;
static HANDLE ht3;

static uint64_t *pa, *pb;               // ptrs to arrays
static uint32_t aIndex[260];            // start + end indexes, 256 bins
static uint32_t aIi;                    // current index for 4 bins

static DWORD WINAPI Thread0(LPVOID lpvoid);

void RadixSort(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t i,m,n;
uint64_t u;
    for(i = 0; i < count; i++)          // generate histogram
        aIndex[(size_t)(a[i] >> 56)]++;
    m = 0;                              // convert to indexes
    for(i = 0; i < 257; i++){           //  including end of bin[255]
        n = aIndex[i];
        aIndex[i] = m;
        m += n;
    }
    for(i = 0; i < count; i++){         // sort by msb
        u = a[i];
        m = (uint32_t)(u >> 56);
        b[aIndex[m]++] = u;
    }
    for(i = 256; i; i--)                // restore aIndex
        aIndex[i] = aIndex[i-1];
    aIndex[0] = 0;
    for(aIi = 0; aIi < 256; aIi += 4){
        ReleaseSemaphore(hs1, 1, NULL);     // start threads
        ReleaseSemaphore(hs2, 1, NULL);
        ReleaseSemaphore(hs3, 1, NULL);
        Thread0(NULL);
        WaitForSingleObject(hs0, INFINITE); // wait for threads done
        WaitForSingleObject(hs0, INFINITE);
        WaitForSingleObject(hs0, INFINITE);
    }        
}

void RadixSort7(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t mIndex[7][256] = {0};          // count / index matrix
uint32_t i,j,m,n;
uint64_t u;
    for(i = 0; i < count; i++){         // generate histograms
        u = a[i];
        for(j = 0; j < 7; j++){
            mIndex[j][(size_t)(u & 0xff)]++;
            u >>= 8;
        }
    }
    for(j = 0; j < 7; j++){             // convert to indices
        m = 0;
        for(i = 0; i < 256; i++){
            n = mIndex[j][i];
            mIndex[j][i] = m;
            m += n;
        }       
    }
    for(j = 0; j < 7; j++){             // radix sort
        for(i = 0; i < count; i++){     //  sort by current lsb
            u = a[i];
            m = (size_t)(u>>(j<<3))&0xff;
            b[mIndex[j][m]++] = u;
        }
        std::swap(a, b);                //  swap ptrs
    }
}

static DWORD WINAPI Thread0(LPVOID lpvoid)
{
    RadixSort7(pb + aIndex[aIi+0], pa + aIndex[aIi+0], aIndex[aIi+1]-aIndex[aIi+0]);
    return 0;
}

static DWORD WINAPI Thread1(LPVOID lpvoid)
{
    while(1){
        WaitForSingleObject(hs1, INFINITE); // wait for semaphore
        RadixSort7(pb + aIndex[aIi+1], pa + aIndex[aIi+1], aIndex[aIi+2]-aIndex[aIi+1]);
        ReleaseSemaphore(hs0, 1, NULL);     // indicate done
        if(aIi == 252)                      // exit if all bins done
            return 0;
    }
}

static DWORD WINAPI Thread2(LPVOID lpvoid)
{
    while(1){
        WaitForSingleObject(hs2, INFINITE); // wait for semaphore
        RadixSort7(pb + aIndex[aIi+2], pa + aIndex[aIi+2], aIndex[aIi+3]-aIndex[aIi+2]);
        ReleaseSemaphore(hs0, 1, NULL);     // indicate done
        if(aIi == 252)                      // exit if all bins done
            return 0;
    }
}

static DWORD WINAPI Thread3(LPVOID lpvoid)
{
    while(1){
        WaitForSingleObject(hs3, INFINITE); // wait for semaphore
        RadixSort7(pb + aIndex[aIi+3], pa + aIndex[aIi+3], aIndex[aIi+4]-aIndex[aIi+3]);
        ReleaseSemaphore(hs0, 1, NULL);     // indicate done
        if(aIi == 252)                      // exit if all bins done
            return 0;
    }
}

uint64_t rnd64()                        // random 64 bit integer
{
static uint64_t r = 1ull;
    r = r * 6364136223846793005ull + 1442695040888963407ull;
    return r;
}

int main( )
{
    uint64_t * a = new uint64_t [COUNT];    // allocate data array
    uint64_t * b = new uint64_t [COUNT];    // allocate temp array
    size_t i;
    pa = a;                                 // set global ptrs
    pb = b;

    hs0 = CreateSemaphore(NULL,0,7,NULL);   // create semaphores
    hs1 = CreateSemaphore(NULL,0,1,NULL);
    hs2 = CreateSemaphore(NULL,0,1,NULL);
    hs3 = CreateSemaphore(NULL,0,1,NULL);

    ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);  // create threads
    ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
    ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);

    for(i = 0; i < COUNT; i++){             // generate data
        a[i] = rnd64();
    }

#if QP
    QueryPerformanceFrequency(&liQPFrequency);
    dQPFrequency = (double)liQPFrequency.QuadPart;
    timeBeginPeriod(1);                     // set ticker to 1000 hz
    Sleep(128);                             // wait for it to settle
    QueryPerformanceCounter(&liStartTime);
#else
    ctTimeStart = clock();
#endif

    RadixSort(a, b, COUNT);

#if QP
    QueryPerformanceCounter(&liStopTime);
    dStartTime = (double)liStartTime.QuadPart;
    dStopTime  = (double)liStopTime.QuadPart;
    dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
    timeEndPeriod(1);                       // restore ticker to default
    std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
    ctTimeStop = clock();
    std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif

    WaitForSingleObject(ht1, INFINITE); // wait for threads
    WaitForSingleObject(ht2, INFINITE);
    WaitForSingleObject(ht3, INFINITE);
    CloseHandle(ht1);                   // close threads
    CloseHandle(ht2);
    CloseHandle(ht3);
    CloseHandle(hs0);                   // close semaphores
    CloseHandle(hs1);
    CloseHandle(hs2);
    CloseHandle(hs3);
    
    for(i = 1; i < COUNT; i++){
        if(a[i] < a[i-1]){
            break;}}
    if(i == COUNT)
        std::cout << "passed" << std::endl;
    else
        std::cout << "failed" << std::endl;

    delete[] b;
    delete[] a;
    return 0;
}