join_node 图流构建

join_node graph flow construction

我正在试验来自 TBB 的 Intel Graph Flow。我对结果非常满意,我发现这个产品非常棒,具有无限的可能性。然而,我遇到了一个我修复的 pb,但我不满意。 pb如下

   message
A ----------\       tuple<message,message>             WHATEVER
   message   join ------------------------- C------------------------
B ----------/

当我们想要同步并避免将消息(及其值)传播 n 次时应用此模式。英特尔提供了一个很好解释 pb 的例子(和解决方案 - Intel example)。我的pb是使用静态方法构造的元组和图的构造。它是完全静态的,特别是如果连接节点的输入边数(input_port<i> 在英特尔示例中)是变量。

TBB-graph 流的大师知道这个 pb 的 "dynamic approach" 吗?

最佳,

蒂姆 [编辑我的代码真实 pb]

我能做到:

std::vector<tbb::flow::function_node<std::size_t, message>> vec_node;

for (int i(0) ; i < 3 ;++i)
        nodes_cont_.emplace_back(my_amazing_function_to_create_node(g_));

tbb::flow::make_edge(vec_node[0], tbb::flow::input_port<0>

tbb::flow::make_edge(vec_node[1], tbb::flow::input_port<1>(node_join_));
tbb::flow::make_edge(vec_node[2], tbb::flow::input_port<2>(node_join_));

我做不到:

for(int i(0); i < vec_node.size(); ++i)
    tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));

由于 "tuple" 和 "tbb::flow::input_port" 函数。

join 节点上的端口数量是静态的(在编译时确定。)如果您希望一个输出的输入数量可变,您需要能够指出哪个 "port" 消息进来了,还有它的价值。

TBB 有一个封装端口号和值的变体类型(它是 indexer_node 的输出)。如果你使用那个类型(定义一个 indexer_node,但不要实例化它,您可以使用节点的 ::output_type 作为 multifunction_node 的输入类型(它可能有多个输出,但可以只有一个输出),并让函数multifunction_node 的主体决定它何时具有正确数量的输出,然后您可以在输入时存储值,并且当 multifunction_node 看到 "correct " 输入数量时,它可以构造一个输出值并将其转发给它的后继者。

图形将如下所示:

我看到的一个问题是您必须定义 multifunction_node 的输出类型。这也是一个静态声明,尽管变体元组可能是您在那里需要的。

EDIT:

让我们做一些简化的假设:

  • 虽然 N 在编译时未知,但在运行时已知且不变。放宽此约束将需要在每条消息中传递一些额外的数据。
  • 尽管您使用的是 tuple,但我相信那是因为 join_nodes 的输出是 tuple(我尝试将向量添加为特例,但我不'认为这是一个选项。)我假设向量中的所有 function_nodes 都与输出具有相同的类型。这样我们就可以避免使用变体类型。
  • 我们传递的数据不是特别大(复制构造不是特别昂贵。)放宽此约束将需要更加小心地访问每个节点中的数据。
  • 有一些东西可以唯一地定义在一起的消息。例如,如果您将只读数据缓冲区传递给向量中的每个 function_node,则该缓冲区的地址就是让我们知道将哪些消息放在一起的部分。

我在TBB工作已经有几年了,所以可能有些事情我不知道,但我可以给你一个草图。

图形将如下所示:

(我实际上是在勾勒出标签匹配连接的结构,因为听起来这就是你想要的。)

当你构造function_nodes的向量时,每个function_body都需要知道它的索引是什么。一般来说,这意味着向量是指向 function_nodes 的指针,并且每个节点都是用索引作为其参数之一构造的。

我假设 source_node's 输出类似于缓冲区。该缓冲区被传递给向量中的每个 function_node,并且每个 function_node 的输出类型为

  • 缓冲区地址
  • 节点向量中 function_node 的索引
  • 其他神奇的好处

multifuncton_node 是完成大部分工作的地方。它有

  • hash_maps 的向量,由 function_node 索引索引,并使用缓冲区地址的键,包含每个 function_node 的各种缓冲区的结果。
  • a hash_map 带有缓冲区地址的键,包含该缓冲区接收到的元素数量的计数。当它达到 N 时,你就拥有了所有的输入。

multifunction_node 收到消息时,它

  • 将数据添加到hash_map[i][key],其中i是function_node索引(在输入消息中),key是缓冲区地址
  • 增量hash_count[key]。如果现在是 N,那么
  • 构造一个结果值向量,从该索引的散列 table 中提取每个值。
  • 如果您构建了该值,则转发该值,否则仅 return.

对于数据的传递和存储方式,以及如果您希望重用值时如何清理元素,存在一些问题,但这是基本草图。

如果您在编译时知道特定程序的 N,但想以通用方式实现图形以供不同程序使用的库,BOOST_PP 是小 N 的一个选项。

我实现了一个图表,在最慢的连接节点输出 continue_msg 之后生成 continue_msg。为此,我需要 N 个缓冲区节点并将它们连接到具有 N 个相同类型端口的连接节点 (tbb::flow::continue_msg).

基本上,下面的代码可以满足您对

的预期
for(int i(0); i < vec_node.size(); ++i)
    tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));

...但是使用预编译器 "write" 多行正确的 make_edge 调用,但最多只有 N 个(对于 N < MY_JOIN_NODE_VARIADIC_MAX,该选择是任意的限制为 "small" N):

    #include "boost/preprocessor/repetition/repeat_from_to.hpp"
    #include "boost/preprocessor/repetition/repeat.hpp"
    #include "boost/preprocessor/arithmetic/inc.hpp"

    ...

    #define MY_JOIN_NODE_VARIADIC_MAX 8
    #define MY_FUNC_IMPL(z, n, unused) tbb::flow::make_edge(vec_node[##n], tbb::flow::input_port<##n>(joinNode));
    #define MY_MAKE_IMPL(z, n, unused)                                  \
    template <size_t N, typename TJoinNode> void                        \
    makeAllEdges (TJoinNode& joinNode,                                  \
                     typename std::enable_if< N == n >::type * = 0)     \
    {                                                                   \
        BOOST_PP_REPEAT(n, MY_FUNC_IMPL, unused)                          \
    }
    BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_INC(MY_JOIN_NODE_VARIADIC_MAX), MY_MAKE_IMPL, unused)
    #undef MY_MAKE_IMPL
    #undef MY_FUNC_IMPL
    #undef MY_JOIN_NODE_VARIADIC_MAX

这段代码是一个函数定义。然后可以调用"makeAllEdges"。 (请注意,在这个例子中,我假设 makeAllEdges 是一个 class 方法,并且 vec_node 是 class 的一个成员,因此在 makeAllEdges 的范围内是已知的。)

我遇到过类似的情况,编译时函数节点的个数没有确定。就我而言,我通过以下方式解决了这个问题:

  • 将每个传入 edge/flow 图节点与排队连接节点的一个端口连接起来。
  • 也将相同的 edge/node 连接到多功能节点。在接收到的输入数量与传入边的数量匹配之前,此多功能节点不会输出任何内容。
  • 将 multifunction_node 的输出端口连接到所有队列 join_nodes 的每个第二个端口。当内部计数达到限制时,multifunction_node 将释放缓冲区并将其存储的值转发到其相应的输出端口。

如果您想对所有数据执行单个操作而不是转发传入的缓冲消息,您可以通过随后在所有缓冲节点的输出端口上调用 try_get 来实现。

using namespace flow = tbb::flow;

template <class Input, class Output>
struct DynamicJoinNode {
  using BufferNode = flow::join_node<std::tuple<Input, flow::continue_msg>, flow::queueing>;
  using ContinueNode = flow::multifunction_node<Input, std::tuple<flow::continue_msg>>;

  std::atomic<size_t> count;
  std::vector<BufferNode> buffers;
  ContinueNode customContinueNode;

  template <class InputNodeIt>
  DynamicJoinNode(flow::graph &graph, InputNodeIt first, InputNodeIt last) :
    count(0),
    buffering(std::distance(first, last), BufferNode(graph)),
    customContinueNode(graph, [this](Input in, ContinueNode::output_ports_type &out) {
      unsigned previous = count.load();
      unsigned desired;
      do {
       desired = previous + 1;
       if (desired == buffers.size())
         desired = 0; // reached the last element: reset the count
      } while (count.compare_exchange_weak(previous, desired));
      if (desired) {
        get<1>(out).try_put(flow::continue_msg{});
      }
    })
  {    
    for (auto bufferIt = buffers.begin(); first != last; ++first, ++bufferIt) {
      flow::make_edge(*first, flow::input_port<0>(*bufferIt));
      flow::make_edge(*first, customContinueNode);
      flow::make_edge(customContinueNode, flow::input_port<1>(*bufferIt));
    }
  }
};