DPDK RTE_RING 中发送和接收单个消息的性能低下
low performance of send and receiving single message in RTE_RING of DPDK
地狱 Stack-Overflow 专家。
我想知道在使用 1 burst in (tx rx queue) 测试往返延迟时,是否对 rte_ring 导致的高延迟有一个很好的解释。
我已经使用两个节点(客户端和服务器)测试了性能
并计算了单次乒乓球往返的延迟时间。
延迟是在两个不同的测试用例上计算的
1. 直接从 rx tx 队列发送和接收的经过延迟。
2. 从 rte_ring 发送和接收经过的延迟。
这是我在 1 个突发 (tx rx)
上测试 DPDK 时产生的 RTE_RING 延迟的结果
DPDK+RING是使用RING发送乒乓消息的延时
DPDK 是直接将消息发送到 tx rx 队列所经过的延迟。
RING 是从 DPDK 延迟中减去 DPDK+RING 的假定延迟。
具有 1 个突发的直接 tx rx 队列的延迟
当我发送一条消息(512、1024、4096 字节)(每个请求 1 个突发)并从远程服务器接收到 1 个响应突发时。
延迟约为 4 ~ 8 微秒。
RTE_RING 1 个突发的延迟
当我使用 rte_ring 从客户端和服务器发送和接收数据时,延迟疯狂增加,从 59 微秒到 100 微秒。
RTE_RING 10 个突发的延迟
例如,当我使用突发时(每个请求 10 条消息)
并通过将总经过时间除以总乒乓消息(总延迟)/(总乒乓接收消息)来计算经过的延迟。
我可以使用 rte_ring 7 ~ 10 微秒获得非常好的性能。
我想知道是否有人可以告诉我应该看什么以减少 RTE_RING 的延迟。
因为即使我不使用多次突发,延迟也应该很低。
这是客户端的代码,用于将数据包添加到 tx-ring
if (rte_ring_enqueue(tx_ring, client_txt) < 0) {
printf("[user] Failed to send message - message discarded\n");
} else {
total_sent++;
if (chara_debug) printf("[%d] Client txt data::[%.24s...]__length::[%ld]\n", total_sent++, client_txt, strlen(client_txt));
}
这里是发送数据到 tx-queue 的代码
void
l2fwd_tx_loop()
{
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
struct rte_mbuf *m;
unsigned lcore_id;
unsigned portid, nb_rx;
struct lcore_queue_conf *qconf;
struct rte_eth_dev_tx_buffer *buffer;
lcore_id = 1;
qconf = &lcore_queue_conf[lcore_id];
struct rte_mbuf *rm[10];
portid = qconf->rx_port_list[0];
char* data;
char* send_msg;
struct message obj;
struct fuse_message * e = NULL;
char *msg, *_msg;
void *__msg;
int total_tx;
while (!force_quit) {
total_tx=0;
while(total_tx<batch){
if (rte_ring_dequeue(tx_ring, &__msg) < 0) {
usleep(5);
// sched_yield();
// printf("Failed to recv message - message discarded\n");
} else {
_msg = (char *)__msg;
rm[total_tx] = rte_pktmbuf_alloc(test_pktmbuf_pool);
data = rte_pktmbuf_append(rm[total_tx], PKT_SIZE*sizeof(char));
if(strcmp(hostname,"c3n24")==0) {
data += sizeof(struct ether_hdr) - 2; // ASU SERVER
l2fwd_mac_updating(rm[total_tx], portid); // ASU SERVER
}
rte_memcpy(data, _msg, PKT_SIZE*sizeof(char));
if(PKT_SIZE==1024) printf("[%d]\n",total_tx);
if(chara_debug) printf("[%d] send msg in DPDK: %s",total_tx, _msg);
total_tx++;
// rte_pktmbuf_dump(stdout, rm[0], 60);
}
}
int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
for(int i=0; i<total_tx; i++) {
rte_pktmbuf_free(rm[i]);
}
}
}
这里是从 rx 队列接收的代码
void
l2fwd_rx_loop() {
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
struct rte_mbuf *m;
unsigned lcore_id;
unsigned i, j, portid, nb_rx;
struct lcore_queue_conf *qconf;
struct rte_eth_dev_tx_buffer *buffer;
lcore_id = rte_lcore_id();
qconf = &lcore_queue_conf[lcore_id];
struct rte_mbuf *rm[1];
while (!force_quit) {
/*
* Read packet from RX queues
*/
portid = qconf->rx_port_list[0];
nb_rx = rte_eth_rx_burst((uint8_t) portid, 0, pkts_burst, MAX_PKT_BURST);
for (j = 0; j < nb_rx; j++) {
m = pkts_burst[j];
int rte_mbuf_packet_length = rte_pktmbuf_pkt_len(m);
if (rte_mbuf_packet_length == (PKT_SIZE)) {
// rte_pktmbuf_dump(stdout, m, 60);
if(strcmp(hostname,"c3n24")==0) {
// dpdk_pktmbuf_dump(stdout, m, PKT_SIZE, sizeof(struct ether_hdr)-2);
dpdk_packet_process(rte_pktmbuf_mtod(m, void * ), PKT_SIZE, sizeof(struct ether_hdr) - 2);
}
}
rte_pktmbuf_free(m);
}
}
}
这里是从 rx-ring 接收数据的代码
while (batched_packets<targ->batch) {
if (rte_ring_dequeue(rx_ring, &_msg) < 0){
usleep(5);
// sched_yield();
}
else {
recv_msg = (char *) _msg;
if (chara_debug) printf("[%d] Server reply data::[%.24s...]__length::[%ld]\n", batched_packets, recv_msg, strlen(recv_msg));
total_recved++;
batched_packets++;
}
}
1。修复代码中的错误:
这个循环可能会引起各种麻烦:
int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
for (int i = 0; i < total_tx; i++) {
rte_pktmbuf_free(rm[i]);
}
rte_eth_tx_burst()
释放缓冲区,所以我们不需要释放它们。我们只需要释放(或重试发送)我们传递给 rte_eth_tx_burst()
的缓冲区(在本例中为 total_tx
)与 sent/enqueued 实际缓冲区数量之间的差异 rte_eth_tx_burst()
.
因此代码应如下所示:
int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
for (int i = rtn; i < total_tx; i++) { // Loop from rtn, not from 0
rte_pktmbuf_free(rm[i]);
}
2。看起来 usleep()
对你来说太长了
请注意,我们传递给 usleep()
的参数是 最小 时间间隔。
完全删除它们以确认。如果这是原因,请将 usleep()
s 更改为:
rte_pause()
-- 在给定的 CPU
上可能出现的最短停顿
rte_delay_ms()
/ rte_delay_us()
- 基本上在给定的时间间隔内重复 rte_pause()
。
sched_yield()
-- 产生 CPU 给另一个线程(如果有的话)。基本上最短的usleep()
还要确保所有 printf()
都被注释掉或从代码中编译出来,因为每个 printf()
调用也会引入巨大的延迟...
感谢您的建议,Andriy Berestovskyy
(如果您能根据我的发现得出您的专业建议,那么我会将其标记为合适的答案。)
我找到了答案,线程的设计导致了延迟问题。
rte_ring 应该一直忙于轮询。
计算延迟的线程正在发送带有 rte_ring tx 的数据包,但也从 rte_ring rx 接收数据包。这是延迟的主要原因。
所以我在实现dpdk的时候,需要确保rte_rings不被打断,一直做busy polling。因此,如果我使用 rte_ring tx 和 rte_ring rx,那么我需要使用两个分别执行 tx 和 rx 的线程。
如果我展示了所有的源代码,Andriy 可能会注意到这个问题,
但我尽量简化了源代码,这样我就能得到更多顾问的回应。
没想到线程的设计会造成这么大的延迟。
地狱 Stack-Overflow 专家。
我想知道在使用 1 burst in (tx rx queue) 测试往返延迟时,是否对 rte_ring 导致的高延迟有一个很好的解释。
我已经使用两个节点(客户端和服务器)测试了性能 并计算了单次乒乓球往返的延迟时间。 延迟是在两个不同的测试用例上计算的 1. 直接从 rx tx 队列发送和接收的经过延迟。 2. 从 rte_ring 发送和接收经过的延迟。
这是我在 1 个突发 (tx rx)
上测试 DPDK 时产生的 RTE_RING 延迟的结果DPDK+RING是使用RING发送乒乓消息的延时
DPDK 是直接将消息发送到 tx rx 队列所经过的延迟。
RING 是从 DPDK 延迟中减去 DPDK+RING 的假定延迟。
具有 1 个突发的直接 tx rx 队列的延迟 当我发送一条消息(512、1024、4096 字节)(每个请求 1 个突发)并从远程服务器接收到 1 个响应突发时。 延迟约为 4 ~ 8 微秒。
RTE_RING 1 个突发的延迟 当我使用 rte_ring 从客户端和服务器发送和接收数据时,延迟疯狂增加,从 59 微秒到 100 微秒。
RTE_RING 10 个突发的延迟 例如,当我使用突发时(每个请求 10 条消息) 并通过将总经过时间除以总乒乓消息(总延迟)/(总乒乓接收消息)来计算经过的延迟。 我可以使用 rte_ring 7 ~ 10 微秒获得非常好的性能。
我想知道是否有人可以告诉我应该看什么以减少 RTE_RING 的延迟。 因为即使我不使用多次突发,延迟也应该很低。
这是客户端的代码,用于将数据包添加到 tx-ring
if (rte_ring_enqueue(tx_ring, client_txt) < 0) {
printf("[user] Failed to send message - message discarded\n");
} else {
total_sent++;
if (chara_debug) printf("[%d] Client txt data::[%.24s...]__length::[%ld]\n", total_sent++, client_txt, strlen(client_txt));
}
这里是发送数据到 tx-queue 的代码
void
l2fwd_tx_loop()
{
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
struct rte_mbuf *m;
unsigned lcore_id;
unsigned portid, nb_rx;
struct lcore_queue_conf *qconf;
struct rte_eth_dev_tx_buffer *buffer;
lcore_id = 1;
qconf = &lcore_queue_conf[lcore_id];
struct rte_mbuf *rm[10];
portid = qconf->rx_port_list[0];
char* data;
char* send_msg;
struct message obj;
struct fuse_message * e = NULL;
char *msg, *_msg;
void *__msg;
int total_tx;
while (!force_quit) {
total_tx=0;
while(total_tx<batch){
if (rte_ring_dequeue(tx_ring, &__msg) < 0) {
usleep(5);
// sched_yield();
// printf("Failed to recv message - message discarded\n");
} else {
_msg = (char *)__msg;
rm[total_tx] = rte_pktmbuf_alloc(test_pktmbuf_pool);
data = rte_pktmbuf_append(rm[total_tx], PKT_SIZE*sizeof(char));
if(strcmp(hostname,"c3n24")==0) {
data += sizeof(struct ether_hdr) - 2; // ASU SERVER
l2fwd_mac_updating(rm[total_tx], portid); // ASU SERVER
}
rte_memcpy(data, _msg, PKT_SIZE*sizeof(char));
if(PKT_SIZE==1024) printf("[%d]\n",total_tx);
if(chara_debug) printf("[%d] send msg in DPDK: %s",total_tx, _msg);
total_tx++;
// rte_pktmbuf_dump(stdout, rm[0], 60);
}
}
int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
for(int i=0; i<total_tx; i++) {
rte_pktmbuf_free(rm[i]);
}
}
}
这里是从 rx 队列接收的代码
void
l2fwd_rx_loop() {
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
struct rte_mbuf *m;
unsigned lcore_id;
unsigned i, j, portid, nb_rx;
struct lcore_queue_conf *qconf;
struct rte_eth_dev_tx_buffer *buffer;
lcore_id = rte_lcore_id();
qconf = &lcore_queue_conf[lcore_id];
struct rte_mbuf *rm[1];
while (!force_quit) {
/*
* Read packet from RX queues
*/
portid = qconf->rx_port_list[0];
nb_rx = rte_eth_rx_burst((uint8_t) portid, 0, pkts_burst, MAX_PKT_BURST);
for (j = 0; j < nb_rx; j++) {
m = pkts_burst[j];
int rte_mbuf_packet_length = rte_pktmbuf_pkt_len(m);
if (rte_mbuf_packet_length == (PKT_SIZE)) {
// rte_pktmbuf_dump(stdout, m, 60);
if(strcmp(hostname,"c3n24")==0) {
// dpdk_pktmbuf_dump(stdout, m, PKT_SIZE, sizeof(struct ether_hdr)-2);
dpdk_packet_process(rte_pktmbuf_mtod(m, void * ), PKT_SIZE, sizeof(struct ether_hdr) - 2);
}
}
rte_pktmbuf_free(m);
}
}
}
这里是从 rx-ring 接收数据的代码
while (batched_packets<targ->batch) {
if (rte_ring_dequeue(rx_ring, &_msg) < 0){
usleep(5);
// sched_yield();
}
else {
recv_msg = (char *) _msg;
if (chara_debug) printf("[%d] Server reply data::[%.24s...]__length::[%ld]\n", batched_packets, recv_msg, strlen(recv_msg));
total_recved++;
batched_packets++;
}
}
1。修复代码中的错误:
这个循环可能会引起各种麻烦:
int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
for (int i = 0; i < total_tx; i++) {
rte_pktmbuf_free(rm[i]);
}
rte_eth_tx_burst()
释放缓冲区,所以我们不需要释放它们。我们只需要释放(或重试发送)我们传递给 rte_eth_tx_burst()
的缓冲区(在本例中为 total_tx
)与 sent/enqueued 实际缓冲区数量之间的差异 rte_eth_tx_burst()
.
因此代码应如下所示:
int rtn = rte_eth_tx_burst(portid, 0, rm, total_tx);
for (int i = rtn; i < total_tx; i++) { // Loop from rtn, not from 0
rte_pktmbuf_free(rm[i]);
}
2。看起来 usleep()
对你来说太长了
请注意,我们传递给 usleep()
的参数是 最小 时间间隔。
完全删除它们以确认。如果这是原因,请将 usleep()
s 更改为:
rte_pause()
-- 在给定的 CPU 上可能出现的最短停顿
rte_delay_ms()
/rte_delay_us()
- 基本上在给定的时间间隔内重复rte_pause()
。sched_yield()
-- 产生 CPU 给另一个线程(如果有的话)。基本上最短的usleep()
还要确保所有 printf()
都被注释掉或从代码中编译出来,因为每个 printf()
调用也会引入巨大的延迟...
感谢您的建议,Andriy Berestovskyy (如果您能根据我的发现得出您的专业建议,那么我会将其标记为合适的答案。)
我找到了答案,线程的设计导致了延迟问题。
rte_ring 应该一直忙于轮询。
计算延迟的线程正在发送带有 rte_ring tx 的数据包,但也从 rte_ring rx 接收数据包。这是延迟的主要原因。
所以我在实现dpdk的时候,需要确保rte_rings不被打断,一直做busy polling。因此,如果我使用 rte_ring tx 和 rte_ring rx,那么我需要使用两个分别执行 tx 和 rx 的线程。
如果我展示了所有的源代码,Andriy 可能会注意到这个问题, 但我尽量简化了源代码,这样我就能得到更多顾问的回应。
没想到线程的设计会造成这么大的延迟。