并行插入排序,天真的尝试,使用 pthreads 的失败尝试,线程未并行排序,但使用串行工作正常

Paralleling Insertion Sort, Naive Attempt, Failed Attempt With pthreads, Threads Not Sorted In Parallel, But Work Fine With Serial

上下文

您好!

我曾尝试使用 pthreads,并决定实施插入排序以查看性能差异,但我遇到了一个奇怪的错误,我不确定该去哪里或遇到什么问题真的运行宁成。

步骤

对于上下文,我为并行插入排序所做的如下,

  1. 制作一个函数以单独包含它
  2. 向该函数发送一个结构容器,i) 指向数组本身的指针,ii) 数组 尺寸,
  3. 得到了核心数(让我们通过我机器上的函数调用这个 numberOfCores(是的,输出是正确的,我有 4 个核心)
  4. 已创建 numberOfCores pthread_t 个对象
  5. 请记住,最后一个线程没有必要处理包含相同数量元素的数组(尝试相应地将余额分配给线程)
  6. 所以我创建了一个二维矩阵,行 == numberOfCores - 1,列 == floor(sizeOfArray / numberOfCores),我用不同的测试输入一次又一次地检查,这些都正确分配,
  7. 创建了一个 lastArray 数组,大小为 lastArraySize = (sizeOfCompleteArray / numberOfCores) + (sizeOfCompleteArray % numberOfCores)
  8. 然后我将原始数组拆分为subArrays,二维矩阵
  9. 然后我将数组的最后一部分拆分为 lastArray
  10. 然后我打包了各自的数组和它们的大小,并将线程分配给 运行 简单的 insertionSort 函数,发送它们各自的 packed 数据。我的计划是 merge 不同的数组,在它们被每个单独的线程排序后。我知道编写更高效的代码库是 100% 可能的,但这只是小事,我不想在这上面花太多钱。请查看下面的代码逐行检查。
  11. 然后我使用 pthread_join 得到排序的 numberOfCores 数组
  12. 然后我检查每个是否排序。

问题

问题是这样的,

  1. 如果我从第 9 步(如上所述)开始使用循环按顺序对上述数组进行排序,所有数组都会按预期排序。这里没问题
  2. 但是,如果我尝试我的并行版本,我会遇到意想不到的结果。有时 2 个被排序,有时只有 1 个,有时是 3 个,但从来没有 4 个。为什么它总是给我不可靠的结果现在我无法理解。
  3. 数组[0] 始终未排序。我不知道为什么。当我使用上述串行方法时它确实得到排序,但从未使用并行方法。

代码


// def for InsertionSortPacking, mentioned below 
struct InsertionSortPacking
{
    ull *Array;       // Pointer to the array
    size_t ArraySize; // Size of the array
};

static inline void *
pthread_insertionSort(void *arrayMetaDataToUnpack) // arrayMetaDataToUnpack contains the pointer used to pass the data to the func
{
    // UNPACKING START
    ull *_Array = ((InsertionSortPacking *)arrayMetaDataToUnpack)->Array;
    size_t sizeOfArray = ((InsertionSortPacking *)arrayMetaDataToUnpack)->ArraySize;
    // UNPACKING END

    // Compulsory use of reinterpret_cast to keep everything consistent
    ull *__tempArray = reinterpret_cast<ull *>(_Array);

    // Custom func to get the number of cores on machine
    int numberOfCores = getNumCores();
    std::cout << "Number of cores detected: " << numberOfCores << ".\n";

    // Step 1, create vars
    pthread_t threads[numberOfCores];              // all threads must run, step 4
    int rows = numberOfCores - 1;                  // ... but not every thread might be bound to have equal job ( equal sub array sizes ), step 5
    int cols = floor(sizeOfArray / numberOfCores); // the most evenly contained size possible, step 6
    ull subArrays[rows][cols]{0u};                 // the {} initializes everything for me, step 6

    // step 7
    int lastArraySize = (sizeOfArray / numberOfCores) + (sizeOfArray % numberOfCores);
    ull lastArray[lastArraySize];

    // step 8
    for (int i = 0; i < rows; ++i)
        for (int j = 0; j < cols; ++j)
            subArrays[i][j] = __tempArray[i * numberOfCores + j];

    // step 9
    for (int i = 0, j = cols * rows + 1;
         i < lastArraySize and j < sizeOfArray;
         ++i, ++j)
        lastArray[i] = __tempArray[j];

    // EXTRA, just for clarification. Individually, all work fine
    // getTimeForTemplate just prints out some logs, takes in the
    // the function pointer to the function I want to use, the array, I want to sort, the size, and a text to print
    // for (int i = 0; i < rows; ++i)
    //     getTimeForTemplate(insertionSort, subArrays[i], cols, "insertion sort - serial, subArray[" + std::to_string(i) + std::string("]"));
    // getTimeForTemplate(insertionSort, lastArray, lastArraySize, "insertion sort - serial, lastArray");

    // step 10
    for (int i = 0; i <= rows; ++i)
    {
        InsertionSortPacking __tempPacking{};

        if (i == rows) // Step 3.1, for the lastArray
        {
            __tempPacking.Array = (ull *)lastArray;
            __tempPacking.ArraySize = lastArraySize;
        }
        else // Step 3.2, for the remaining arrays
        {
            __tempPacking.Array = (ull *)subArrays[i];
            __tempPacking.ArraySize = cols;
        }

        int __rc = pthread_create(&threads[i], nullptr, insertionSort, (void *)&__tempPacking);
        if (__rc)
        {
            std::cout << "[ERROR] Unable to create thread, rc " << __rc << " i, " << i << std::endl;
            exit(-1);
        }
    }

    // step 11, joining the pthreads, regardless of array size
    for (int i = 0; i <= rows; ++i)
    {
        int __rc = pthread_join(threads[i], nullptr);
        if (__rc)
        {
            std::cout << "[ERROR] Unable to join thread, rc " << __rc << " i, " << i << std::endl;
            exit(-1);
        }
    }

    // Step 12, checking if all the jobs have completed the sorting
    for (int i = 0; i <= rows; ++i)
    {
        InsertionSortPacking __tempPacking{};

        if (i == rows) // Step 3.1, for the lastArray
        {
            __tempPacking.Array = (ull *)lastArray;
            __tempPacking.ArraySize = lastArraySize;

            if (isSorted(&__tempPacking) == -1) // Sorting succeeded if -1 returned
                std::cout << "Sorting succeeded for lastArrray\n";
            else
                std::cout << "Sorting failed for lastArray\n";
        }
        else // Step 3.2, for the remaining arrays
        {
            __tempPacking.Array = (ull *)subArrays[i];
            __tempPacking.ArraySize = cols;

            if (isSorted(&__tempPacking) == -1) // Sorting succeeded if -1 returned
                std::cout << "Sorting succeeded for subArray[" << i << "]\n";
            else
                std::cout << "Sorting failed for subArray[" << i << "]\n";
        }
    }

    ...
    // further code for merging and whatnot

    return sortedArray;

以及我用来编译和 运行、

的命令
g++ -std=gnu++17 -std=c++17 -O2 insertion.cpp -o insertion -lpthread && ./insertion > log.log

这是完整程序的一些日志,https://gist.github.com/Rubix982/d0bbdabc2d8a1fc531e9104f7172dbfe

我有什么问题,为什么我不能解决它们?

  1. 这看起来根本不像是竞争条件。每个数组在内存中都是不同且独立的。没有两个线程访问序列中任何地方的不同线程
  2. ...可能是线程有时在部分排序时连接在一起 - 这会发生吗?
  3. 我没有 运行我的机器可以处理更多的线程(肯定是 4 个内核)
  4. 我不明白在哪里以及如何调试为什么有时 1 个线程失败,或者为什么 3 个线程有时失败
  5. 我看不出这里需要互斥锁。没有比赛条件,但也许我从错误的角度思考这个问题
  6. 我尝试使用 sleep() 来确保线程在适当的时间完成工作,然后再将它们连接在一起,但无济于事。

结论

如有任何帮助,我们将不胜感激!请让我知道我可能在哪里出错,以及调试和修复此问题的可能方法。 我知道我不应该使用选择排序/插入排序来并行化,我应该使用合并排序和快速排序及其并行版本,但这只是为了好玩和学习。

谢谢!

您启动线程的方式存在重大问题:

    for (int i = 0; i <= rows; ++i)
    {
        InsertionSortPacking __tempPacking{};

请注意,__tempPacking 的生命周期是主机循环的一次迭代,但此处...

[...]

        int __rc = pthread_create(&threads[i], nullptr, insertionSort, (void *)&__tempPacking);

...您将指向 __tempPacking 的指针传递给新线程。一旦该对象的生命周期结束,新线程不得尝试访问它,但即使 insertionSort() 对它所做的所有操作都是将成员读回局部变量,也没有任何东西强制在对象的生命周期内完成。这是一般意义上的竞争条件,即使不是语言标准定义意义上的“数据竞争”,当排​​序线程丢失时,结果行为是未定义的。

        if (__rc)
        {
            std::cout << "[ERROR] Unable to create thread, rc " << __rc << " i, " << i << std::endl;
            exit(-1);
        }
    }

你继续评论

  1. This does not seem like a race condition at all. Each array is different and independent in memory. No two threads access a different thread anywhere in the sequence

见上文。确实存在竞争条件,只是与您正在查看的对象无关。

  1. ... It could be that threads are sometimes joined when they are partially sorted - can that happen?

有了UB,一切皆有可能。在没有 UB 的情况下,加入线程会导致加入者等待线程函数终止。你还没有展示它,所以我不能说你的 insertionSort() 函数是否容易在不对数组进行完全排序的情况下终止,但这不是它在 [=54= 中使用的特定特征] 程序。

  1. I'm not running more threads than my machine can handle ( 4 cores for sure )

这并不重要。您可以拥有比内核更多的线程;他们只是不会同时 运行。

  1. I do not understand where to and how to debug why sometimes 1 thread fails, or why 3 threads fail the other time

是的,调试多线程程序可能非常具有挑战性。许多调试器可以 运行 多线程代码并询问不同线程的状态,但 threading-related 错误特别容易对执行环境敏感,因此当 运行 在一个调试器。

  1. I do not see the need for mutex locks here at all. No race conditions, but maybe I'm thinking about this from the wrong angle

您可以避免使用同步对象,方法是确保提供给每个线程的 InsertionSortPacking 对象比该线程长,同时保留 属性 每个线程都有自己的、不同的对象。

或者,您可以动态分配它们并让排序线程负责删除它们。

  1. I tried using sleep() to make sure the threads get their work done in due time BEFORE joining them together, but to no avail.

有时 sleep() 可以解决同步问题,但这绝不是真正的解决方案。如果存在竞争条件,那么 sleep() 可能会扭曲每个竞争对手获胜的可能性,但这并不能解决首先存在竞争的问题。

在这种情况下,如果您将 sleep() 放在 thread-launching 循环的末尾而不是之后,它实际上可能会阻止问题的出现,但这会再次破坏您的目的,这不是真正的解决方案。