每次调用的工作负载非常低的连续并行计算

Consecutive parallel computation with very low workload per invocation

假设我想并行计算以下示例中的方法 AB

while (true)
{
   int state = NextState();

   int a = A(state);
   int b = B(state);

   ImportantMethod(a, b);
}

NextState 在此示例中无法提前计算。 如您所见,方法 AB 的参数取决于状态。结果值 ab 用于调用每次迭代都必须调用的 ImportantMethod

这需要多线程代码并行调用 AB 并在单次迭代中等待它们的结果。不可能合并连续的迭代来创建更大的工作量。

ImportantMethod 用于实时应用程序,目前调用频率太低。 AB 方法的工作量很小。大约 10 次乘法和 2 次三角函数(正弦、余弦),只是为了让大家明白这一点。 AB 是瓶颈,ImportantMethod 只需要循环非常快。

我考虑过为 AB 使用两个线程,并在每次迭代时使用条件变量唤醒它们。但是考虑到小的工作量,我担心唤醒线程和等待它们的结果的开销比计算本身要大。我也可以尝试在没有任何同步的情况下使用带有布尔标志的忙等待。那会很好地占用 cpu,但这是可以接受的。

我已经大大简化了问题,但我不认为我可以创建一个测试应用程序来提供准确的性能指标,与真实应用程序的性能指标相当。不过在实际应用中实现它会非常复杂,所以我想在尝试之前获得更多知识。

因此我想问一下是否有人有过这类问题的经验。特别是对于非常频繁的线程唤醒和同步。

忙等待是否比使用条件变量通知更有利? 还有其他我尚未考虑过的同步多个线程的方法可能更适合吗?

对于同步开销的基线,您可以使用 std::async 开始第一个 A 计算(因为这只是一个小的代码更改),并测量整体速度影响(与顺序码)。

之前:

int a = A(state);
int b = B(state);
ImportantMethod(a, b);

之后:

#include <future>

auto a = std::async(std::launch::async, A, state);
int b = B(state);
ImportantMethod(a.get(), b);

但是,对于您描述的运算(10 次乘法,加上两次 sin/cos 运算),我怀疑它是否会有所改进。我假设您已经使用了一些优化的 sin/cos 实现(预先计算的表也值得尝试)。

请注意,即使是 single-threaded 代码,如今的 CPU 已经在后台进行了大量并行计算。如果 AB 是简单的函数,编译器应该有很大的优化潜力(例如,矢量化)。

同步操作(一旦开始使用线程就无法避免)的成本可能很高。两个线程都必须在 state 对象的内存上同步。要将 A(或 B)的结果从一个线程传输到另一个线程,您可以在这种情况下使用原子操作。如果直接轮询它,则需要用非法值对其进行初始化,以便检测写入时间。否则,您需要另外设置一些标志。

我怀疑同步开销是否可以小于直接计算值的成本。

好的,我现在做了一些研究。也许我的结果对某些人来说会很有趣。 请记住,下面代码的某些部分是 windows 特定的(准确地说是线程关联和睡眠)。

看来您可以并行地更快地处理非常小的工作负载。但它基本上需要使用忙等待来停止一些(或所有)内核。否则我无法实现任何加速。设置线程亲和性也非常重要,这样线程就可以在特定的核心上执行。在我的测试中,不这样做会再次导致速度慢于顺序对应物。

我想到的是:

  1. 将主线程关联设置为特定核心
  2. 生成 N 个工作线程并将这些关联设置为不同的核心
  3. 在工作线程中忙于等待,直到计算作业到达
  4. 使用主线程将作业分配给工作线程,并进行某种忙循环来检查是否有任何工作线程完成了执行

这导致所有参与的核心(包括主线程核心)在计算时达到 100% 的最大值。工作线程不会在连续的并行调用之间终止以节省线程启动开销。

请注意,至少在 windows 上,您必须等待一段时间才能 windows 将线程正确移动到请求的核心。我用睡眠做到了。下图显示了我在测试期间(60 秒)cpu 的使用情况。

我在第一个核心的图上标出了几个位置:

  1. Windows 将所有工作线程移至它们的特定核心。 Core#0 不再被占用。
  2. 任务调度程序在 Core#0 上启动,并行计算已经开始。
  3. 任务调度程序已完成,所有其他核心也将恢复正常。
  4. 顺序计算已经开始
  5. 顺序计算完成

我测试了两种不同并行度的时序:


32个任务(基本上并行最多7个,上图是在本次测试中抓取的):

并行:2.7s

连续:7.1s


2个任务(2个并行,其他核心忙等待(浪费)):

并行:0.365s

连续:0.464s


如您所见,甚至在 2 个任务中也有一些加速。这不是一半的时间,但考虑到工作量小,我想这还不错。我其实很高兴,高并行度的表现非常好。请记住,工作量仍然很小,在所有任务完成后,所有内容都会在开始下一次迭代之前同步。 权衡是,只要可以进行并行计算,所有参与的核心都会被完全阻塞。

对于任何感兴趣的人,这是我的测试代码:

#include <iostream>
#include <thread>
#include "windows.h"

//Object that can compute something to simulate workload
class ComputeObject
{
public:
  float A;
  float B;
  float C;

  void Compute()
  {
    //Do some calculations that approximately match the small workload
    C = float(sin(A)) + float(cos(B));
    C = C * A + atan2(A, B);
    C /= A + B;
  }
};

//Stores some information for the worker thread that is responsible for this task
struct Task
{
  ComputeObject* ComputeObject = nullptr; //the current compute object
  bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
  std::thread WorkerThread; //the thread
};

//Pointer to an array of Task
Task* Tasks;

//Number of Cpus (logical cores) and number of worker tasks
int NumCpus;
int NumTask;

//Flag, that is used to stop the workers when computation is done
bool WorkersRunning;

//Main function for each worker
void TaskWorker(const int workerIndex)
{
  //Assign the worker to a specific logical core.
  //Skip the first one, because the scheduler is going to block that one.
  SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));

  //Get pointer to task struct for current worker
  const auto task = Tasks + workerIndex;
  while (WorkersRunning)
  {
    while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
    if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
    task->ComputeObject->Compute(); //Do computation
    task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
  }
}

//The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
{
  const auto computeObjectsStart = computeObjects;
  const auto computeObjectsEnd = computeObjects + numComputeObjects;
  const auto tasksStart = Tasks;
  const auto tasksEnd = Tasks + NumTask;

  auto currentComputeObject = computeObjectsStart;
  auto currentTask = tasksStart;

  //as long as there are still ComputeObjects to be processed
  while (currentComputeObject != computeObjectsEnd)
  {
    if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
    {
      currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
      currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
    }

    currentTask++; //advance to the next task
    if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
  }
}

int main()
{
  //get number of logical cores
  NumCpus = int(std::thread::hardware_concurrency());
  NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
  Tasks = new Task[NumTask];


  const auto numParallelWork = 32; //number of computations that can be done in parallel
  const int numInvocations = 1e6; //number of invocations for time measurement

  //create ComputeObjects array and compute start/end pointers
  const auto computeObjects = new ComputeObject[numParallelWork];
  const auto computeObjectsStart = computeObjects;
  const auto computeObjectsEnd = computeObjects + numParallelWork;

  //fill ComputeObjects with random data
  for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
  {
    computeObject->A = float(rand()) / RAND_MAX;
    computeObject->B = float(rand()) / RAND_MAX;
  }

  //set workers running
  WorkersRunning = true;

  //spawn workers
  for (auto i = 0; i < NumTask; i++)
    Tasks[i].WorkerThread = std::thread(TaskWorker, i);

  //put this thread to first logical core
  SetThreadAffinityMask(GetCurrentThread(), 1 << 0);

  //wait 20s to allow windows to actually move the threads to the specified cores
  //monitor task manager to ensure windows actually did that
  Sleep(20000);

  std::chrono::steady_clock::time_point start, end;
  std::chrono::duration<double> elapsed;



  start = std::chrono::steady_clock::now(); //start time measurement

  //invoke task scheduler a few times
  for (auto i = 0; i < numInvocations; i++)
    TaskScheduler(computeObjects, numParallelWork);

  end = std::chrono::steady_clock::now(); //end time measurement
  elapsed = end - start;
  std::cout << "parallel: " << elapsed.count() << "s" << std::endl;


  //stop workers and wait for all threads
  WorkersRunning = false;
  for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();


  //wait 10 seconds just for good measures
  Sleep(10000);


  start = std::chrono::steady_clock::now(); //start time measurement

  //invoke sequential loop a few times
  for (auto i = 0; i < numInvocations; i++)
    for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
      computeObject->Compute();

  end = std::chrono::steady_clock::now(); //end time measurement
  elapsed = end - start;
  std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
}