从基于线程的流水线转移到基于任务的并行? (C++)
Moving from thread-based pipelining to task-based parallelism? (C++)
我正在研究如何将一些现有的 C++ 代码从基于线程的并行性迁移到基于任务的并行性,以及这种迁移是否可取。这是我的场景:
假设我有一些函数要在某个事件上执行。假设我有一台相机,每次到达一帧时我都想做一些繁重的处理并保存结果。一些处理是串行的,所以如果我只是在同一个线程中串行处理每一帧,我就不会得到完整的 CPU 利用率。假设帧每 33 毫秒到达一次,并且帧的处理延迟接近 100 毫秒。
因此,在我当前的实现中,我创建了 3 个处理帧的线程,并以循环方式将每个新帧分配给其中一个工作线程。所以线程 T0 可能会处理帧 F0、F3、F6 等。现在我得到了充分的 CPU 利用率,而且我不必丢帧来保持实时速率。
由于处理需要各种大的临时资源,我可以预先为每个工作线程分配这些资源。因此不必为每一帧重新分配它们。这种每线程资源的策略对于粒度非常有效:如果它们是按帧分配的,这将花费太长时间,但如果有更多的工作线程,我们就会耗尽资源。
我看不到使用标准 C++11 或 Microsoft 的 PPL 库将这种基于线程的并行性替换为基于任务的并行性的方法。如果有一个可以在下面勾勒出的这样做的模式,我会很乐意学习它。
问题是在哪里存储状态 - 分配的临时资源(例如 GPU 内存) - 可以重新用于后续帧,但不得与当前处理帧的资源冲突。
在这种情况下是否需要迁移到基于任务的并行机制?
我想通了。这是一个示例解决方案:
#include <iostream>
#include <ppltasks.h>
#include <thread>
#include <vector>
using PipelineState = int;
using PipelineStateArg = std::shared_ptr<PipelineState>;
using FrameState = int;
struct Pipeline
{
PipelineStateArg state;
concurrency::task<void> task;
};
std::vector<Pipeline> pipelines;
void proc(const FrameState& fs, PipelineState& ps)
{
std::cout << "Process frame " << fs << " in pipeline " << ps << std::endl;
}
void on_frame(int index)
{
FrameState frame = index;
if (index < 2)
{
// Start a new pipeline
auto state = std::make_shared<PipelineState>(index);
pipelines.push_back({state, concurrency::create_task([=]()
{
proc(frame, *state);
})});
}
else
{
// Use an existing pipeline
auto& pipeline = pipelines[index & 1];
auto state = pipeline.state;
pipeline.task = pipeline.task.then([=]()
{
proc(frame, *state);
});
}
}
void main()
{
for (int i = 0; i < 100; ++i)
{
on_frame(i);
std::this_thread::sleep_for(std::chrono::milliseconds(33));
}
for (auto& pipeline : pipelines)
pipeline.tail.wait();
}
我正在研究如何将一些现有的 C++ 代码从基于线程的并行性迁移到基于任务的并行性,以及这种迁移是否可取。这是我的场景:
假设我有一些函数要在某个事件上执行。假设我有一台相机,每次到达一帧时我都想做一些繁重的处理并保存结果。一些处理是串行的,所以如果我只是在同一个线程中串行处理每一帧,我就不会得到完整的 CPU 利用率。假设帧每 33 毫秒到达一次,并且帧的处理延迟接近 100 毫秒。
因此,在我当前的实现中,我创建了 3 个处理帧的线程,并以循环方式将每个新帧分配给其中一个工作线程。所以线程 T0 可能会处理帧 F0、F3、F6 等。现在我得到了充分的 CPU 利用率,而且我不必丢帧来保持实时速率。
由于处理需要各种大的临时资源,我可以预先为每个工作线程分配这些资源。因此不必为每一帧重新分配它们。这种每线程资源的策略对于粒度非常有效:如果它们是按帧分配的,这将花费太长时间,但如果有更多的工作线程,我们就会耗尽资源。
我看不到使用标准 C++11 或 Microsoft 的 PPL 库将这种基于线程的并行性替换为基于任务的并行性的方法。如果有一个可以在下面勾勒出的这样做的模式,我会很乐意学习它。
问题是在哪里存储状态 - 分配的临时资源(例如 GPU 内存) - 可以重新用于后续帧,但不得与当前处理帧的资源冲突。
在这种情况下是否需要迁移到基于任务的并行机制?
我想通了。这是一个示例解决方案:
#include <iostream>
#include <ppltasks.h>
#include <thread>
#include <vector>
using PipelineState = int;
using PipelineStateArg = std::shared_ptr<PipelineState>;
using FrameState = int;
struct Pipeline
{
PipelineStateArg state;
concurrency::task<void> task;
};
std::vector<Pipeline> pipelines;
void proc(const FrameState& fs, PipelineState& ps)
{
std::cout << "Process frame " << fs << " in pipeline " << ps << std::endl;
}
void on_frame(int index)
{
FrameState frame = index;
if (index < 2)
{
// Start a new pipeline
auto state = std::make_shared<PipelineState>(index);
pipelines.push_back({state, concurrency::create_task([=]()
{
proc(frame, *state);
})});
}
else
{
// Use an existing pipeline
auto& pipeline = pipelines[index & 1];
auto state = pipeline.state;
pipeline.task = pipeline.task.then([=]()
{
proc(frame, *state);
});
}
}
void main()
{
for (int i = 0; i < 100; ++i)
{
on_frame(i);
std::this_thread::sleep_for(std::chrono::milliseconds(33));
}
for (auto& pipeline : pipelines)
pipeline.tail.wait();
}