每次调用的工作负载非常低的连续并行计算
Consecutive parallel computation with very low workload per invocation
假设我想并行计算以下示例中的方法 A
和 B
:
while (true)
{
int state = NextState();
int a = A(state);
int b = B(state);
ImportantMethod(a, b);
}
NextState
在此示例中无法提前计算。
如您所见,方法 A
和 B
的参数取决于状态。结果值 a
和 b
用于调用每次迭代都必须调用的 ImportantMethod
。
这需要多线程代码并行调用 A
和 B
并在单次迭代中等待它们的结果。不可能合并连续的迭代来创建更大的工作量。
ImportantMethod
用于实时应用程序,目前调用频率太低。
A
和 B
方法的工作量很小。大约 10 次乘法和 2 次三角函数(正弦、余弦),只是为了让大家明白这一点。
A
和 B
是瓶颈,ImportantMethod
只需要循环非常快。
我考虑过为 A
和 B
使用两个线程,并在每次迭代时使用条件变量唤醒它们。但是考虑到小的工作量,我担心唤醒线程和等待它们的结果的开销比计算本身要大。我也可以尝试在没有任何同步的情况下使用带有布尔标志的忙等待。那会很好地占用 cpu,但这是可以接受的。
我已经大大简化了问题,但我不认为我可以创建一个测试应用程序来提供准确的性能指标,与真实应用程序的性能指标相当。不过在实际应用中实现它会非常复杂,所以我想在尝试之前获得更多知识。
因此我想问一下是否有人有过这类问题的经验。特别是对于非常频繁的线程唤醒和同步。
忙等待是否比使用条件变量通知更有利?
还有其他我尚未考虑过的同步多个线程的方法可能更适合吗?
对于同步开销的基线,您可以使用 std::async
开始第一个 A
计算(因为这只是一个小的代码更改),并测量整体速度影响(与顺序码)。
之前:
int a = A(state);
int b = B(state);
ImportantMethod(a, b);
之后:
#include <future>
auto a = std::async(std::launch::async, A, state);
int b = B(state);
ImportantMethod(a.get(), b);
但是,对于您描述的运算(10 次乘法,加上两次 sin/cos 运算),我怀疑它是否会有所改进。我假设您已经使用了一些优化的 sin/cos 实现(预先计算的表也值得尝试)。
请注意,即使是 single-threaded 代码,如今的 CPU 已经在后台进行了大量并行计算。如果 A
和 B
是简单的函数,编译器应该有很大的优化潜力(例如,矢量化)。
同步操作(一旦开始使用线程就无法避免)的成本可能很高。两个线程都必须在 state
对象的内存上同步。要将 A
(或 B
)的结果从一个线程传输到另一个线程,您可以在这种情况下使用原子操作。如果直接轮询它,则需要用非法值对其进行初始化,以便检测写入时间。否则,您需要另外设置一些标志。
我怀疑同步开销是否可以小于直接计算值的成本。
好的,我现在做了一些研究。也许我的结果对某些人来说会很有趣。
请记住,下面代码的某些部分是 windows 特定的(准确地说是线程关联和睡眠)。
看来您可以并行地更快地处理非常小的工作负载。但它基本上需要使用忙等待来停止一些(或所有)内核。否则我无法实现任何加速。设置线程亲和性也非常重要,这样线程就可以在特定的核心上执行。在我的测试中,不这样做会再次导致速度慢于顺序对应物。
我想到的是:
- 将主线程关联设置为特定核心
- 生成 N 个工作线程并将这些关联设置为不同的核心
- 在工作线程中忙于等待,直到计算作业到达
- 使用主线程将作业分配给工作线程,并进行某种忙循环来检查是否有任何工作线程完成了执行
这导致所有参与的核心(包括主线程核心)在计算时达到 100% 的最大值。工作线程不会在连续的并行调用之间终止以节省线程启动开销。
请注意,至少在 windows 上,您必须等待一段时间才能 windows 将线程正确移动到请求的核心。我用睡眠做到了。下图显示了我在测试期间(60 秒)cpu 的使用情况。
我在第一个核心的图上标出了几个位置:
- Windows 将所有工作线程移至它们的特定核心。 Core#0 不再被占用。
- 任务调度程序在 Core#0 上启动,并行计算已经开始。
- 任务调度程序已完成,所有其他核心也将恢复正常。
- 顺序计算已经开始
- 顺序计算完成
我测试了两种不同并行度的时序:
32个任务(基本上并行最多7个,上图是在本次测试中抓取的):
并行:2.7s
连续:7.1s
2个任务(2个并行,其他核心忙等待(浪费)):
并行:0.365s
连续:0.464s
如您所见,甚至在 2 个任务中也有一些加速。这不是一半的时间,但考虑到工作量小,我想这还不错。我其实很高兴,高并行度的表现非常好。请记住,工作量仍然很小,在所有任务完成后,所有内容都会在开始下一次迭代之前同步。
权衡是,只要可以进行并行计算,所有参与的核心都会被完全阻塞。
对于任何感兴趣的人,这是我的测试代码:
#include <iostream>
#include <thread>
#include "windows.h"
//Object that can compute something to simulate workload
class ComputeObject
{
public:
float A;
float B;
float C;
void Compute()
{
//Do some calculations that approximately match the small workload
C = float(sin(A)) + float(cos(B));
C = C * A + atan2(A, B);
C /= A + B;
}
};
//Stores some information for the worker thread that is responsible for this task
struct Task
{
ComputeObject* ComputeObject = nullptr; //the current compute object
bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
std::thread WorkerThread; //the thread
};
//Pointer to an array of Task
Task* Tasks;
//Number of Cpus (logical cores) and number of worker tasks
int NumCpus;
int NumTask;
//Flag, that is used to stop the workers when computation is done
bool WorkersRunning;
//Main function for each worker
void TaskWorker(const int workerIndex)
{
//Assign the worker to a specific logical core.
//Skip the first one, because the scheduler is going to block that one.
SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));
//Get pointer to task struct for current worker
const auto task = Tasks + workerIndex;
while (WorkersRunning)
{
while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
task->ComputeObject->Compute(); //Do computation
task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
}
}
//The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
{
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numComputeObjects;
const auto tasksStart = Tasks;
const auto tasksEnd = Tasks + NumTask;
auto currentComputeObject = computeObjectsStart;
auto currentTask = tasksStart;
//as long as there are still ComputeObjects to be processed
while (currentComputeObject != computeObjectsEnd)
{
if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
{
currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
}
currentTask++; //advance to the next task
if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
}
}
int main()
{
//get number of logical cores
NumCpus = int(std::thread::hardware_concurrency());
NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
Tasks = new Task[NumTask];
const auto numParallelWork = 32; //number of computations that can be done in parallel
const int numInvocations = 1e6; //number of invocations for time measurement
//create ComputeObjects array and compute start/end pointers
const auto computeObjects = new ComputeObject[numParallelWork];
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numParallelWork;
//fill ComputeObjects with random data
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
{
computeObject->A = float(rand()) / RAND_MAX;
computeObject->B = float(rand()) / RAND_MAX;
}
//set workers running
WorkersRunning = true;
//spawn workers
for (auto i = 0; i < NumTask; i++)
Tasks[i].WorkerThread = std::thread(TaskWorker, i);
//put this thread to first logical core
SetThreadAffinityMask(GetCurrentThread(), 1 << 0);
//wait 20s to allow windows to actually move the threads to the specified cores
//monitor task manager to ensure windows actually did that
Sleep(20000);
std::chrono::steady_clock::time_point start, end;
std::chrono::duration<double> elapsed;
start = std::chrono::steady_clock::now(); //start time measurement
//invoke task scheduler a few times
for (auto i = 0; i < numInvocations; i++)
TaskScheduler(computeObjects, numParallelWork);
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "parallel: " << elapsed.count() << "s" << std::endl;
//stop workers and wait for all threads
WorkersRunning = false;
for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();
//wait 10 seconds just for good measures
Sleep(10000);
start = std::chrono::steady_clock::now(); //start time measurement
//invoke sequential loop a few times
for (auto i = 0; i < numInvocations; i++)
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
computeObject->Compute();
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
}
假设我想并行计算以下示例中的方法 A
和 B
:
while (true)
{
int state = NextState();
int a = A(state);
int b = B(state);
ImportantMethod(a, b);
}
NextState
在此示例中无法提前计算。
如您所见,方法 A
和 B
的参数取决于状态。结果值 a
和 b
用于调用每次迭代都必须调用的 ImportantMethod
。
这需要多线程代码并行调用 A
和 B
并在单次迭代中等待它们的结果。不可能合并连续的迭代来创建更大的工作量。
ImportantMethod
用于实时应用程序,目前调用频率太低。
A
和 B
方法的工作量很小。大约 10 次乘法和 2 次三角函数(正弦、余弦),只是为了让大家明白这一点。
A
和 B
是瓶颈,ImportantMethod
只需要循环非常快。
我考虑过为 A
和 B
使用两个线程,并在每次迭代时使用条件变量唤醒它们。但是考虑到小的工作量,我担心唤醒线程和等待它们的结果的开销比计算本身要大。我也可以尝试在没有任何同步的情况下使用带有布尔标志的忙等待。那会很好地占用 cpu,但这是可以接受的。
我已经大大简化了问题,但我不认为我可以创建一个测试应用程序来提供准确的性能指标,与真实应用程序的性能指标相当。不过在实际应用中实现它会非常复杂,所以我想在尝试之前获得更多知识。
因此我想问一下是否有人有过这类问题的经验。特别是对于非常频繁的线程唤醒和同步。
忙等待是否比使用条件变量通知更有利? 还有其他我尚未考虑过的同步多个线程的方法可能更适合吗?
对于同步开销的基线,您可以使用 std::async
开始第一个 A
计算(因为这只是一个小的代码更改),并测量整体速度影响(与顺序码)。
之前:
int a = A(state);
int b = B(state);
ImportantMethod(a, b);
之后:
#include <future>
auto a = std::async(std::launch::async, A, state);
int b = B(state);
ImportantMethod(a.get(), b);
但是,对于您描述的运算(10 次乘法,加上两次 sin/cos 运算),我怀疑它是否会有所改进。我假设您已经使用了一些优化的 sin/cos 实现(预先计算的表也值得尝试)。
请注意,即使是 single-threaded 代码,如今的 CPU 已经在后台进行了大量并行计算。如果 A
和 B
是简单的函数,编译器应该有很大的优化潜力(例如,矢量化)。
同步操作(一旦开始使用线程就无法避免)的成本可能很高。两个线程都必须在 state
对象的内存上同步。要将 A
(或 B
)的结果从一个线程传输到另一个线程,您可以在这种情况下使用原子操作。如果直接轮询它,则需要用非法值对其进行初始化,以便检测写入时间。否则,您需要另外设置一些标志。
我怀疑同步开销是否可以小于直接计算值的成本。
好的,我现在做了一些研究。也许我的结果对某些人来说会很有趣。 请记住,下面代码的某些部分是 windows 特定的(准确地说是线程关联和睡眠)。
看来您可以并行地更快地处理非常小的工作负载。但它基本上需要使用忙等待来停止一些(或所有)内核。否则我无法实现任何加速。设置线程亲和性也非常重要,这样线程就可以在特定的核心上执行。在我的测试中,不这样做会再次导致速度慢于顺序对应物。
我想到的是:
- 将主线程关联设置为特定核心
- 生成 N 个工作线程并将这些关联设置为不同的核心
- 在工作线程中忙于等待,直到计算作业到达
- 使用主线程将作业分配给工作线程,并进行某种忙循环来检查是否有任何工作线程完成了执行
这导致所有参与的核心(包括主线程核心)在计算时达到 100% 的最大值。工作线程不会在连续的并行调用之间终止以节省线程启动开销。
请注意,至少在 windows 上,您必须等待一段时间才能 windows 将线程正确移动到请求的核心。我用睡眠做到了。下图显示了我在测试期间(60 秒)cpu 的使用情况。
我在第一个核心的图上标出了几个位置:
- Windows 将所有工作线程移至它们的特定核心。 Core#0 不再被占用。
- 任务调度程序在 Core#0 上启动,并行计算已经开始。
- 任务调度程序已完成,所有其他核心也将恢复正常。
- 顺序计算已经开始
- 顺序计算完成
我测试了两种不同并行度的时序:
32个任务(基本上并行最多7个,上图是在本次测试中抓取的):
并行:2.7s
连续:7.1s
2个任务(2个并行,其他核心忙等待(浪费)):
并行:0.365s
连续:0.464s
如您所见,甚至在 2 个任务中也有一些加速。这不是一半的时间,但考虑到工作量小,我想这还不错。我其实很高兴,高并行度的表现非常好。请记住,工作量仍然很小,在所有任务完成后,所有内容都会在开始下一次迭代之前同步。 权衡是,只要可以进行并行计算,所有参与的核心都会被完全阻塞。
对于任何感兴趣的人,这是我的测试代码:
#include <iostream>
#include <thread>
#include "windows.h"
//Object that can compute something to simulate workload
class ComputeObject
{
public:
float A;
float B;
float C;
void Compute()
{
//Do some calculations that approximately match the small workload
C = float(sin(A)) + float(cos(B));
C = C * A + atan2(A, B);
C /= A + B;
}
};
//Stores some information for the worker thread that is responsible for this task
struct Task
{
ComputeObject* ComputeObject = nullptr; //the current compute object
bool AssignedFlag = false; //flag that specifies if the compute object has a valid object
std::thread WorkerThread; //the thread
};
//Pointer to an array of Task
Task* Tasks;
//Number of Cpus (logical cores) and number of worker tasks
int NumCpus;
int NumTask;
//Flag, that is used to stop the workers when computation is done
bool WorkersRunning;
//Main function for each worker
void TaskWorker(const int workerIndex)
{
//Assign the worker to a specific logical core.
//Skip the first one, because the scheduler is going to block that one.
SetThreadAffinityMask(GetCurrentThread(), 1 << (workerIndex + 1));
//Get pointer to task struct for current worker
const auto task = Tasks + workerIndex;
while (WorkersRunning)
{
while (!task->AssignedFlag && WorkersRunning); //Wait as long as no valid ComputeObject is set or the workers are stopped.
if (!WorkersRunning) break; //Get out of the loop when workers are stopped.
task->ComputeObject->Compute(); //Do computation
task->AssignedFlag = false; //Invalidate current ComputeObject, so that a new one can be assigned from the scheduler
}
}
//The scheduler runs on the main thread and constantly checks whether workers are finished with their ComputeObject and assigns new ones
void TaskScheduler(ComputeObject* computeObjects, const int numComputeObjects)
{
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numComputeObjects;
const auto tasksStart = Tasks;
const auto tasksEnd = Tasks + NumTask;
auto currentComputeObject = computeObjectsStart;
auto currentTask = tasksStart;
//as long as there are still ComputeObjects to be processed
while (currentComputeObject != computeObjectsEnd)
{
if (!currentTask->AssignedFlag) //if current task has no valid ComputeObject yet
{
currentTask->ComputeObject = currentComputeObject++; //assign new computeObject and advance
currentTask->AssignedFlag = true; //set flag to signal that a ComputeObject has been assigned
}
currentTask++; //advance to the next task
if (currentTask == tasksEnd) currentTask = tasksStart; //go back to the first task if the last task was reached
}
}
int main()
{
//get number of logical cores
NumCpus = int(std::thread::hardware_concurrency());
NumTask = NumCpus - 1; //first one is this thread and is going to be blocked by the scheduler
Tasks = new Task[NumTask];
const auto numParallelWork = 32; //number of computations that can be done in parallel
const int numInvocations = 1e6; //number of invocations for time measurement
//create ComputeObjects array and compute start/end pointers
const auto computeObjects = new ComputeObject[numParallelWork];
const auto computeObjectsStart = computeObjects;
const auto computeObjectsEnd = computeObjects + numParallelWork;
//fill ComputeObjects with random data
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
{
computeObject->A = float(rand()) / RAND_MAX;
computeObject->B = float(rand()) / RAND_MAX;
}
//set workers running
WorkersRunning = true;
//spawn workers
for (auto i = 0; i < NumTask; i++)
Tasks[i].WorkerThread = std::thread(TaskWorker, i);
//put this thread to first logical core
SetThreadAffinityMask(GetCurrentThread(), 1 << 0);
//wait 20s to allow windows to actually move the threads to the specified cores
//monitor task manager to ensure windows actually did that
Sleep(20000);
std::chrono::steady_clock::time_point start, end;
std::chrono::duration<double> elapsed;
start = std::chrono::steady_clock::now(); //start time measurement
//invoke task scheduler a few times
for (auto i = 0; i < numInvocations; i++)
TaskScheduler(computeObjects, numParallelWork);
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "parallel: " << elapsed.count() << "s" << std::endl;
//stop workers and wait for all threads
WorkersRunning = false;
for (auto i = 0; i < NumTask; i++) Tasks[i].WorkerThread.join();
//wait 10 seconds just for good measures
Sleep(10000);
start = std::chrono::steady_clock::now(); //start time measurement
//invoke sequential loop a few times
for (auto i = 0; i < numInvocations; i++)
for (auto computeObject = computeObjectsStart; computeObject < computeObjectsEnd; computeObject++)
computeObject->Compute();
end = std::chrono::steady_clock::now(); //end time measurement
elapsed = end - start;
std::cout << "sequential: " << elapsed.count() << "s" << std::endl;
}