如何使 Intel TBB multifunction_node 具有动态端口数?

How to make Intel TBB multifunction_node with dynamic number of ports?

我是 英特尔 TBB 库 的新手。如您所见,我的问题与 tbb::flow::graph 有关。我需要实现如下逻辑:

用户用一些逻辑块绘制图形。每个块(节点)都可以有无限的连接(边),因此每个块(节点)都可以选择下一个放置数据的位置。然后我的程序将在 TBB 库的帮助下构建这样的图并执行计算。

所以我不知道是否可以构建具有动态输出端口数的节点(我猜它必须是multifunction_node)。你能告诉我怎么做吗?

不幸的是,没有办法(没有动态编译)改变 multifunction_node 中输出端口的数量。您可以创建最大数量的端口(由宏开关控制并取决于编译器),然后动态连接到端口。如果您对端口执行 try_put 并且没有附加后继者,则 try_put 失败并且您可以在运行时对此做出反应。

另一种方法(尽管我认为有些挫折)是构建双端口 multifunction_nodes 的二叉树。如果使用带有输出目标的 class 作为字段,构造每个节点以响应目标的一位并输出到端口 0 或端口 1,具体取决于掩码的结果。调度程序短路会相对快速地引导输出通过树,但您会为多个动态调用付出一些代价。

或者您可以使用 2 以外的其他基数(比如 10)

附录: 在与 Mike(flow::graph 的设计者)交谈后,我们意识到还有另一种方法可以解决这个问题,即允许动态数量的端口.你将不得不做一些低级的事情,但它是这样的:

#include "tbb/tbb.h"
#include <iostream>

using namespace tbb::flow;

tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
    output_port_vector_t &my_ports;
    public:
    multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
    multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
    continue_msg operator()(const int in) {
        int current_size = my_ports.size();
        if(in >= current_size) {
            // error condition?  grow concurrent_vector?
            tbb::spin_mutex::scoped_lock gl(io_lock);
            std::cout << "Received input out of range(" << in << ")" << std::endl;
        }
        else {
            // do computation
            my_ports[in]->try_put(in*2);
        }
        return continue_msg();
    }
};

struct output_function_body {
    int my_prefix;
    output_function_body(int i) : my_prefix(i) { }
    int operator()(const int i) {
        tbb::spin_mutex::scoped_lock gl(io_lock);
        std::cout << " output node "<< my_prefix << " received " << i << std::endl;
        return i;
    }
};

int main() {
    graph g;
    output_port_vector_t output_ports;
    function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
    // create broadcast_nodes
    for( int i = 0; i < 20; ++i) {
        bnode_element_t *bp = new bnode_element_t(g);
        output_ports.push_back(bp);
    }

    // attach the output nodes to the broadcast_nodes
    for(int i = 0; i < 20; ++i) {
        function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
        make_edge(*(output_ports[i]),*fp);
    }

    for( int i = 0; i < 21; ++i) {
        my_node.try_put(i);
    }
    g.wait_for_all();
    return 0;
}

以上注意事项:

  • 我们正在创建 concurrent_vector 个指向 broadcast_nodes 的指针。 function_node 的后继者附加到这些 broadcast_nodesfunction_node 的输出被忽略。
  • concurrent_vector 被传递给 multioutput_function_body 的构造函数。在这种情况下,我们根本不需要 multifunction_node 。 multioutput_function_body 在运行时决定 broadcast_nodetry_put 到哪个。 注意 我们正在对 broadcast_nodes 进行显式 try_puts。这些导致为每个 try_put 生成一个任务。生成的任务比排队的任务更快,但调度开销比仅从节点返回值要多。
  • 我没有添加堆分配 broadcast_nodes 和输出 function_nodes 的清理。删除 broadcast_nodes 的 "obvious" 位置将在 multioutput_function_body 的析构函数中。您不应该这样做,因为 function_node 的创建会导致传入函数体的复制构造,并且 function_body 的多个副本将引用 concurrent_vector 的 broadcast_node 个指针。在g.wait_for_all()之后进行删除。

我使用 concurrent_vector 因为它允许在修改 concurrent_vector 时访问指针。在图的执行过程中是否可以添加额外的 broadcast_node 指针的问题是开放的。我希望您只是创建节点并按原样使用它们,而不是即时修改它们。 concurrent_vectors 在增长结构时不要重新分配和移动已经初始化的元素;这就是我使用它的原因,但如果您希望在图形为 运行.

时添加其他节点,请不要认为这是一个完整的答案