DPDK RTE_RING 中发送和接收单个消息的性能低下

low performance of send and receiving single message in RTE_RING of DPDK

地狱 Stack-Overflow 专家。

我想知道在使用 1 burst in (tx rx queue) 测试往返延迟时,是否对 rte_ring 导致的高延迟有一个很好的解释。

我已经使用两个节点(客户端和服务器)测试了性能 并计算了单次乒乓球往返的延迟时间。 延迟是在两个不同的测试用例上计算的 1. 直接从 rx tx 队列发送和接收的经过延迟。 2. 从 rte_ring 发送和接收经过的延迟。

这是我在 1 个突发 (tx rx)

上测试 DPDK 时产生的 RTE_RING 延迟的结果

DPDK+RING是使用RING发送乒乓消息的延时

DPDK 是直接将消息发送到 tx rx 队列所经过的延迟。

RING 是从 DPDK 延迟中减去 DPDK+RING 的假定延迟。

具有 1 个突发的直接 tx rx 队列的延迟 当我发送一条消息(512、1024、4096 字节)(每个请求 1 个突发)并从远程服务器接收到 1 个响应突发时。 延迟约为 4 ~ 8 微秒。

RTE_RING 1 个突发的延迟 当我使用 rte_ring 从客户端和服务器发送和接收数据时,延迟疯狂增加,从 59 微秒到 100 微秒。

RTE_RING 10 个突发的延迟 例如,当我使用突发时(每个请求 10 条消息) 并通过将总经过时间除以总乒乓消息(总延迟)/(总乒乓接收消息)来计算经过的延迟。 我可以使用 rte_ring 7 ~ 10 微秒获得非常好的性能。

我想知道是否有人可以告诉我应该看什么以减少 RTE_RING 的延迟。 因为即使我不使用多次突发,延迟也应该很低。

这是客户端的代码,用于将数据包添加到 tx-ring

    if (rte_ring_enqueue(tx_ring, client_txt) < 0) {
        printf("[user] Failed to send message - message discarded\n");
    } else {
        total_sent++;
        if (chara_debug) printf("[%d] Client txt data::[%.24s...]__length::[%ld]\n", total_sent++, client_txt, strlen(client_txt));
    }

这里是发送数据到 tx-queue 的代码

void
l2fwd_tx_loop()
{
    struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
    struct rte_mbuf *m;

    unsigned lcore_id;
    unsigned portid, nb_rx;
    struct lcore_queue_conf *qconf;
    struct rte_eth_dev_tx_buffer *buffer;

    lcore_id = 1;
    qconf = &lcore_queue_conf[lcore_id];

    struct rte_mbuf *rm[10];
    portid = qconf->rx_port_list[0];
    char* data;
    char* send_msg;
    struct message obj;
    struct fuse_message * e = NULL;
    char *msg, *_msg;
    void *__msg;
    int total_tx;


    while (!force_quit) {

        total_tx=0;
        while(total_tx<batch){
                if (rte_ring_dequeue(tx_ring, &__msg) < 0) {
                    usleep(5);
                    // sched_yield();

                    // printf("Failed to recv message - message discarded\n");
                } else {
                    _msg = (char *)__msg;
                    rm[total_tx] = rte_pktmbuf_alloc(test_pktmbuf_pool);
                    data = rte_pktmbuf_append(rm[total_tx], PKT_SIZE*sizeof(char));


                    if(strcmp(hostname,"c3n24")==0) {
                        data += sizeof(struct ether_hdr) - 2; // ASU SERVER
                        l2fwd_mac_updating(rm[total_tx], portid); // ASU SERVER
                    }

                    rte_memcpy(data, _msg, PKT_SIZE*sizeof(char));

                     if(PKT_SIZE==1024) printf("[%d]\n",total_tx);

                    if(chara_debug) printf("[%d] send msg in DPDK: %s",total_tx, _msg);
                    total_tx++;
                    // rte_pktmbuf_dump(stdout, rm[0], 60);
                }
        }


        int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
        for(int i=0; i<total_tx; i++) {
            rte_pktmbuf_free(rm[i]);
        }
    }
}

这里是从 rx 队列接收的代码

void
l2fwd_rx_loop() {
    struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
    struct rte_mbuf *m;

    unsigned lcore_id;
    unsigned i, j, portid, nb_rx;
    struct lcore_queue_conf *qconf;
    struct rte_eth_dev_tx_buffer *buffer;

    lcore_id = rte_lcore_id();
    qconf = &lcore_queue_conf[lcore_id];

    struct rte_mbuf *rm[1];

    while (!force_quit) {
        /*
         * Read packet from RX queues
         */
            portid = qconf->rx_port_list[0];
            nb_rx = rte_eth_rx_burst((uint8_t) portid, 0, pkts_burst, MAX_PKT_BURST);

            for (j = 0; j < nb_rx; j++) {
                m = pkts_burst[j];
                int rte_mbuf_packet_length = rte_pktmbuf_pkt_len(m);
                if (rte_mbuf_packet_length == (PKT_SIZE)) {
                    // rte_pktmbuf_dump(stdout, m, 60);
                    if(strcmp(hostname,"c3n24")==0) {
                        // dpdk_pktmbuf_dump(stdout, m, PKT_SIZE, sizeof(struct ether_hdr)-2);
                        dpdk_packet_process(rte_pktmbuf_mtod(m, void * ), PKT_SIZE, sizeof(struct ether_hdr) - 2);
                    }
                }
                rte_pktmbuf_free(m);
            }
    }

}

这里是从 rx-ring 接收数据的代码

    while (batched_packets<targ->batch) {
        if (rte_ring_dequeue(rx_ring, &_msg) < 0){
            usleep(5);
            // sched_yield();

        }
        else {
            recv_msg = (char *) _msg;
            if (chara_debug) printf("[%d] Server reply data::[%.24s...]__length::[%ld]\n", batched_packets, recv_msg, strlen(recv_msg));
            total_recved++;
            batched_packets++;
        }
    }

1。修复代码中的错误:

这个循环可能会引起各种麻烦:

    int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
    for (int i = 0; i < total_tx; i++) {
        rte_pktmbuf_free(rm[i]);
    }

rte_eth_tx_burst()释放缓冲区,所以我们不需要释放它们。我们只需要释放(或重试发送)我们传递给 rte_eth_tx_burst() 的缓冲区(在本例中为 total_tx)与 sent/enqueued 实际缓冲区数量之间的差异 rte_eth_tx_burst().

因此代码应如下所示:

    int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
    for (int i = rtn; i < total_tx; i++) { // Loop from rtn, not from 0
        rte_pktmbuf_free(rm[i]);
    }

2。看起来 usleep() 对你来说太长了

请注意,我们传递给 usleep() 的参数是 最小 时间间隔。

完全删除它们以确认。如果这是原因,请将 usleep()s 更改为:

  • rte_pause() -- 在给定的 CPU

  • 上可能出现的最短停顿
  • rte_delay_ms() / rte_delay_us() - 基本上在给定的时间间隔内重复 rte_pause()

  • sched_yield() -- 产生 CPU 给另一个线程(如果有的话)。基本上最短的usleep()

还要确保所有 printf() 都被注释掉或从代码中编译出来,因为每个 printf() 调用也会引入巨大的延迟...

感谢您的建议,Andriy Berestovskyy (如果您能根据我的发现得出您的专业建议,那么我会将其标记为合适的答案。)

我找到了答案,线程的设计导致了延迟问题。

rte_ring 应该一直忙于轮询。

计算延迟的线程正在发送带有 rte_ring tx 的数据包,但也从 rte_ring rx 接收数据包。这是延迟的主要原因。

所以我在实现dpdk的时候,需要确保rte_rings不被打断,一直做busy polling。因此,如果我使用 rte_ring tx 和 rte_ring rx,那么我需要使用两个分别执行 tx 和 rx 的线程。

如果我展示了所有的源代码,Andriy 可能会注意到这个问题, 但我尽量简化了源代码,这样我就能得到更多顾问的回应。

没想到线程的设计会造成这么大的延迟。