join_node 图流构建
join_node graph flow construction
我正在试验来自 TBB 的 Intel Graph Flow。我对结果非常满意,我发现这个产品非常棒,具有无限的可能性。然而,我遇到了一个我修复的 pb,但我不满意。 pb如下
message
A ----------\ tuple<message,message> WHATEVER
message join ------------------------- C------------------------
B ----------/
当我们想要同步并避免将消息(及其值)传播 n 次时应用此模式。英特尔提供了一个很好解释 pb 的例子(和解决方案 - Intel example)。我的pb是使用静态方法构造的元组和图的构造。它是完全静态的,特别是如果连接节点的输入边数(input_port<i>
在英特尔示例中)是变量。
TBB-graph 流的大师知道这个 pb 的 "dynamic approach" 吗?
最佳,
蒂姆
[编辑我的代码真实 pb]
我能做到:
std::vector<tbb::flow::function_node<std::size_t, message>> vec_node;
for (int i(0) ; i < 3 ;++i)
nodes_cont_.emplace_back(my_amazing_function_to_create_node(g_));
tbb::flow::make_edge(vec_node[0], tbb::flow::input_port<0>
tbb::flow::make_edge(vec_node[1], tbb::flow::input_port<1>(node_join_));
tbb::flow::make_edge(vec_node[2], tbb::flow::input_port<2>(node_join_));
我做不到:
for(int i(0); i < vec_node.size(); ++i)
tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));
由于 "tuple" 和 "tbb::flow::input_port" 函数。
join
节点上的端口数量是静态的(在编译时确定。)如果您希望一个输出的输入数量可变,您需要能够指出哪个 "port" 消息进来了,还有它的价值。
TBB 有一个封装端口号和值的变体类型(它是 indexer_node
的输出)。如果你使用那个类型(定义一个 indexer_node
,但不要实例化它,您可以使用节点的 ::output_type
作为 multifunction_node
的输入类型(它可能有多个输出,但可以只有一个输出),并让函数multifunction_node
的主体决定它何时具有正确数量的输出,然后您可以在输入时存储值,并且当 multifunction_node
看到 "correct " 输入数量时,它可以构造一个输出值并将其转发给它的后继者。
图形将如下所示:
我看到的一个问题是您必须定义 multifunction_node
的输出类型。这也是一个静态声明,尽管变体元组可能是您在那里需要的。
EDIT:
让我们做一些简化的假设:
- 虽然
N
在编译时未知,但在运行时已知且不变。放宽此约束将需要在每条消息中传递一些额外的数据。
- 尽管您使用的是
tuple
,但我相信那是因为 join_nodes
的输出是 tuple
(我尝试将向量添加为特例,但我不'认为这是一个选项。)我假设向量中的所有 function_nodes
都与输出具有相同的类型。这样我们就可以避免使用变体类型。
- 我们传递的数据不是特别大(复制构造不是特别昂贵。)放宽此约束将需要更加小心地访问每个节点中的数据。
- 有一些东西可以唯一地定义在一起的消息。例如,如果您将只读数据缓冲区传递给向量中的每个
function_node
,则该缓冲区的地址就是让我们知道将哪些消息放在一起的部分。
我在TBB工作已经有几年了,所以可能有些事情我不知道,但我可以给你一个草图。
图形将如下所示:
(我实际上是在勾勒出标签匹配连接的结构,因为听起来这就是你想要的。)
当你构造function_nodes
的向量时,每个function_body
都需要知道它的索引是什么。一般来说,这意味着向量是指向 function_nodes
的指针,并且每个节点都是用索引作为其参数之一构造的。
我假设 source_node's
输出类似于缓冲区。该缓冲区被传递给向量中的每个 function_node
,并且每个 function_node
的输出类型为
- 缓冲区地址
- 节点向量中
function_node
的索引
- 其他神奇的好处
multifuncton_node
是完成大部分工作的地方。它有
hash_maps
的向量,由 function_node
索引索引,并使用缓冲区地址的键,包含每个 function_node
的各种缓冲区的结果。
- a
hash_map
带有缓冲区地址的键,包含该缓冲区接收到的元素数量的计数。当它达到 N 时,你就拥有了所有的输入。
当 multifunction_node
收到消息时,它
- 将数据添加到
hash_map[i][key]
,其中i是function_node
索引(在输入消息中),key是缓冲区地址
- 增量
hash_count[key]
。如果现在是 N
,那么
- 构造一个结果值向量,从该索引的散列 table 中提取每个值。
- 如果您构建了该值,则转发该值,否则仅 return.
对于数据的传递和存储方式,以及如果您希望重用值时如何清理元素,存在一些问题,但这是基本草图。
如果您在编译时知道特定程序的 N,但想以通用方式实现图形以供不同程序使用的库,BOOST_PP 是小 N 的一个选项。
我实现了一个图表,在最慢的连接节点输出 continue_msg 之后生成 continue_msg。为此,我需要 N 个缓冲区节点并将它们连接到具有 N 个相同类型端口的连接节点 (tbb::flow::continue_msg).
基本上,下面的代码可以满足您对
的预期
for(int i(0); i < vec_node.size(); ++i)
tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));
...但是使用预编译器 "write" 多行正确的 make_edge 调用,但最多只有 N 个(对于 N < MY_JOIN_NODE_VARIADIC_MAX,该选择是任意的限制为 "small" N):
#include "boost/preprocessor/repetition/repeat_from_to.hpp"
#include "boost/preprocessor/repetition/repeat.hpp"
#include "boost/preprocessor/arithmetic/inc.hpp"
...
#define MY_JOIN_NODE_VARIADIC_MAX 8
#define MY_FUNC_IMPL(z, n, unused) tbb::flow::make_edge(vec_node[##n], tbb::flow::input_port<##n>(joinNode));
#define MY_MAKE_IMPL(z, n, unused) \
template <size_t N, typename TJoinNode> void \
makeAllEdges (TJoinNode& joinNode, \
typename std::enable_if< N == n >::type * = 0) \
{ \
BOOST_PP_REPEAT(n, MY_FUNC_IMPL, unused) \
}
BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_INC(MY_JOIN_NODE_VARIADIC_MAX), MY_MAKE_IMPL, unused)
#undef MY_MAKE_IMPL
#undef MY_FUNC_IMPL
#undef MY_JOIN_NODE_VARIADIC_MAX
这段代码是一个函数定义。然后可以调用"makeAllEdges"。 (请注意,在这个例子中,我假设 makeAllEdges 是一个 class 方法,并且 vec_node 是 class 的一个成员,因此在 makeAllEdges 的范围内是已知的。)
我遇到过类似的情况,编译时函数节点的个数没有确定。就我而言,我通过以下方式解决了这个问题:
- 将每个传入 edge/flow 图节点与排队连接节点的一个端口连接起来。
- 也将相同的 edge/node 连接到多功能节点。在接收到的输入数量与传入边的数量匹配之前,此多功能节点不会输出任何内容。
- 将 multifunction_node 的输出端口连接到所有队列 join_nodes 的每个第二个端口。当内部计数达到限制时,multifunction_node 将释放缓冲区并将其存储的值转发到其相应的输出端口。
如果您想对所有数据执行单个操作而不是转发传入的缓冲消息,您可以通过随后在所有缓冲节点的输出端口上调用 try_get
来实现。
using namespace flow = tbb::flow;
template <class Input, class Output>
struct DynamicJoinNode {
using BufferNode = flow::join_node<std::tuple<Input, flow::continue_msg>, flow::queueing>;
using ContinueNode = flow::multifunction_node<Input, std::tuple<flow::continue_msg>>;
std::atomic<size_t> count;
std::vector<BufferNode> buffers;
ContinueNode customContinueNode;
template <class InputNodeIt>
DynamicJoinNode(flow::graph &graph, InputNodeIt first, InputNodeIt last) :
count(0),
buffering(std::distance(first, last), BufferNode(graph)),
customContinueNode(graph, [this](Input in, ContinueNode::output_ports_type &out) {
unsigned previous = count.load();
unsigned desired;
do {
desired = previous + 1;
if (desired == buffers.size())
desired = 0; // reached the last element: reset the count
} while (count.compare_exchange_weak(previous, desired));
if (desired) {
get<1>(out).try_put(flow::continue_msg{});
}
})
{
for (auto bufferIt = buffers.begin(); first != last; ++first, ++bufferIt) {
flow::make_edge(*first, flow::input_port<0>(*bufferIt));
flow::make_edge(*first, customContinueNode);
flow::make_edge(customContinueNode, flow::input_port<1>(*bufferIt));
}
}
};
我正在试验来自 TBB 的 Intel Graph Flow。我对结果非常满意,我发现这个产品非常棒,具有无限的可能性。然而,我遇到了一个我修复的 pb,但我不满意。 pb如下
message
A ----------\ tuple<message,message> WHATEVER
message join ------------------------- C------------------------
B ----------/
当我们想要同步并避免将消息(及其值)传播 n 次时应用此模式。英特尔提供了一个很好解释 pb 的例子(和解决方案 - Intel example)。我的pb是使用静态方法构造的元组和图的构造。它是完全静态的,特别是如果连接节点的输入边数(input_port<i>
在英特尔示例中)是变量。
TBB-graph 流的大师知道这个 pb 的 "dynamic approach" 吗?
最佳,
蒂姆 [编辑我的代码真实 pb]
我能做到:
std::vector<tbb::flow::function_node<std::size_t, message>> vec_node;
for (int i(0) ; i < 3 ;++i)
nodes_cont_.emplace_back(my_amazing_function_to_create_node(g_));
tbb::flow::make_edge(vec_node[0], tbb::flow::input_port<0>
tbb::flow::make_edge(vec_node[1], tbb::flow::input_port<1>(node_join_));
tbb::flow::make_edge(vec_node[2], tbb::flow::input_port<2>(node_join_));
我做不到:
for(int i(0); i < vec_node.size(); ++i)
tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));
由于 "tuple" 和 "tbb::flow::input_port" 函数。
join
节点上的端口数量是静态的(在编译时确定。)如果您希望一个输出的输入数量可变,您需要能够指出哪个 "port" 消息进来了,还有它的价值。
TBB 有一个封装端口号和值的变体类型(它是 indexer_node
的输出)。如果你使用那个类型(定义一个 indexer_node
,但不要实例化它,您可以使用节点的 ::output_type
作为 multifunction_node
的输入类型(它可能有多个输出,但可以只有一个输出),并让函数multifunction_node
的主体决定它何时具有正确数量的输出,然后您可以在输入时存储值,并且当 multifunction_node
看到 "correct " 输入数量时,它可以构造一个输出值并将其转发给它的后继者。
图形将如下所示:
我看到的一个问题是您必须定义 multifunction_node
的输出类型。这也是一个静态声明,尽管变体元组可能是您在那里需要的。
EDIT:
让我们做一些简化的假设:
- 虽然
N
在编译时未知,但在运行时已知且不变。放宽此约束将需要在每条消息中传递一些额外的数据。 - 尽管您使用的是
tuple
,但我相信那是因为join_nodes
的输出是tuple
(我尝试将向量添加为特例,但我不'认为这是一个选项。)我假设向量中的所有function_nodes
都与输出具有相同的类型。这样我们就可以避免使用变体类型。 - 我们传递的数据不是特别大(复制构造不是特别昂贵。)放宽此约束将需要更加小心地访问每个节点中的数据。
- 有一些东西可以唯一地定义在一起的消息。例如,如果您将只读数据缓冲区传递给向量中的每个
function_node
,则该缓冲区的地址就是让我们知道将哪些消息放在一起的部分。
我在TBB工作已经有几年了,所以可能有些事情我不知道,但我可以给你一个草图。
图形将如下所示:
(我实际上是在勾勒出标签匹配连接的结构,因为听起来这就是你想要的。)
当你构造function_nodes
的向量时,每个function_body
都需要知道它的索引是什么。一般来说,这意味着向量是指向 function_nodes
的指针,并且每个节点都是用索引作为其参数之一构造的。
我假设 source_node's
输出类似于缓冲区。该缓冲区被传递给向量中的每个 function_node
,并且每个 function_node
的输出类型为
- 缓冲区地址
- 节点向量中
function_node
的索引 - 其他神奇的好处
multifuncton_node
是完成大部分工作的地方。它有
hash_maps
的向量,由function_node
索引索引,并使用缓冲区地址的键,包含每个function_node
的各种缓冲区的结果。- a
hash_map
带有缓冲区地址的键,包含该缓冲区接收到的元素数量的计数。当它达到 N 时,你就拥有了所有的输入。
当 multifunction_node
收到消息时,它
- 将数据添加到
hash_map[i][key]
,其中i是function_node
索引(在输入消息中),key是缓冲区地址 - 增量
hash_count[key]
。如果现在是N
,那么 - 构造一个结果值向量,从该索引的散列 table 中提取每个值。
- 如果您构建了该值,则转发该值,否则仅 return.
对于数据的传递和存储方式,以及如果您希望重用值时如何清理元素,存在一些问题,但这是基本草图。
如果您在编译时知道特定程序的 N,但想以通用方式实现图形以供不同程序使用的库,BOOST_PP 是小 N 的一个选项。
我实现了一个图表,在最慢的连接节点输出 continue_msg 之后生成 continue_msg。为此,我需要 N 个缓冲区节点并将它们连接到具有 N 个相同类型端口的连接节点 (tbb::flow::continue_msg).
基本上,下面的代码可以满足您对
的预期for(int i(0); i < vec_node.size(); ++i)
tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));
...但是使用预编译器 "write" 多行正确的 make_edge 调用,但最多只有 N 个(对于 N < MY_JOIN_NODE_VARIADIC_MAX,该选择是任意的限制为 "small" N):
#include "boost/preprocessor/repetition/repeat_from_to.hpp"
#include "boost/preprocessor/repetition/repeat.hpp"
#include "boost/preprocessor/arithmetic/inc.hpp"
...
#define MY_JOIN_NODE_VARIADIC_MAX 8
#define MY_FUNC_IMPL(z, n, unused) tbb::flow::make_edge(vec_node[##n], tbb::flow::input_port<##n>(joinNode));
#define MY_MAKE_IMPL(z, n, unused) \
template <size_t N, typename TJoinNode> void \
makeAllEdges (TJoinNode& joinNode, \
typename std::enable_if< N == n >::type * = 0) \
{ \
BOOST_PP_REPEAT(n, MY_FUNC_IMPL, unused) \
}
BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_INC(MY_JOIN_NODE_VARIADIC_MAX), MY_MAKE_IMPL, unused)
#undef MY_MAKE_IMPL
#undef MY_FUNC_IMPL
#undef MY_JOIN_NODE_VARIADIC_MAX
这段代码是一个函数定义。然后可以调用"makeAllEdges"。 (请注意,在这个例子中,我假设 makeAllEdges 是一个 class 方法,并且 vec_node 是 class 的一个成员,因此在 makeAllEdges 的范围内是已知的。)
我遇到过类似的情况,编译时函数节点的个数没有确定。就我而言,我通过以下方式解决了这个问题:
- 将每个传入 edge/flow 图节点与排队连接节点的一个端口连接起来。
- 也将相同的 edge/node 连接到多功能节点。在接收到的输入数量与传入边的数量匹配之前,此多功能节点不会输出任何内容。
- 将 multifunction_node 的输出端口连接到所有队列 join_nodes 的每个第二个端口。当内部计数达到限制时,multifunction_node 将释放缓冲区并将其存储的值转发到其相应的输出端口。
如果您想对所有数据执行单个操作而不是转发传入的缓冲消息,您可以通过随后在所有缓冲节点的输出端口上调用 try_get
来实现。
using namespace flow = tbb::flow;
template <class Input, class Output>
struct DynamicJoinNode {
using BufferNode = flow::join_node<std::tuple<Input, flow::continue_msg>, flow::queueing>;
using ContinueNode = flow::multifunction_node<Input, std::tuple<flow::continue_msg>>;
std::atomic<size_t> count;
std::vector<BufferNode> buffers;
ContinueNode customContinueNode;
template <class InputNodeIt>
DynamicJoinNode(flow::graph &graph, InputNodeIt first, InputNodeIt last) :
count(0),
buffering(std::distance(first, last), BufferNode(graph)),
customContinueNode(graph, [this](Input in, ContinueNode::output_ports_type &out) {
unsigned previous = count.load();
unsigned desired;
do {
desired = previous + 1;
if (desired == buffers.size())
desired = 0; // reached the last element: reset the count
} while (count.compare_exchange_weak(previous, desired));
if (desired) {
get<1>(out).try_put(flow::continue_msg{});
}
})
{
for (auto bufferIt = buffers.begin(); first != last; ++first, ++bufferIt) {
flow::make_edge(*first, flow::input_port<0>(*bufferIt));
flow::make_edge(*first, customContinueNode);
flow::make_edge(customContinueNode, flow::input_port<1>(*bufferIt));
}
}
};