DPDK 17.11.1 - 在进行基于目的地的速率限制时看到掉落
DPDK 17.11.1 - drops seen when doing destination based rate limiting
编辑问题陈述以更多地突出核心逻辑
我们在进行基于目标的速率限制时遇到了性能问题。
我们为每个 {destination-src} 对(最多 100 个目的地和 2^16 个源)维护状态。我们有一个包含 100 个节点的数组,每个节点都有一个 rte_hash*。这个散列 table 将维护该目的地看到的每个源 ip 的状态。我们为每个看到的目的地(0 到 100)都有一个映射,这用于索引到数组中。如果特定来源在一秒钟内超过了为此目的地定义的阈值,我们将阻止该来源,否则我们将允许该来源。在运行时,当我们只看到 2 或 3 个目的地的流量时,没有问题,但当我们超过 5 个时,我们会看到很多丢弃。我们的函数必须进行查找并识别与 dest_ip 和 src_ip 匹配的流。处理流并决定它是否需要丢弃。如果找不到该流,则将其添加到哈希中。
struct flow_state {
struct rte_hash* hash;
};
struct flow_state flow_state_arr[100];
// 我将在 pipeline_init 使用 rte_hash_create 创建这些散列 tables 并在 pipeline_free.
期间释放它们
我正在概述我们在伪代码中所做的事情。
run()
{
1) do rx
2) from the pkt, get index into the flow_state_arr and retrieve the rte_hash* handle
3) rte_hash_lookup_data(hash, src_ip,flow_data)
4) if entry found, take decision on the flow (the decision is simply say rate limiting the flow)
5) else rte_hash_add_data(hash,src_ip,new_flow_data) to add the flow to table and forward
}
请指导我们是否可以在数据路径中拥有这些多个哈希 table 对象,或者如果我们需要分别处理每个目的地的状态,最好的方法是什么。
编辑
谢谢回答。我很乐意分享代码片段和我们收集的结果。我没有其他 DPDK 版本的比较结果,但下面是我们使用 17.11.1.
测试的一些结果
测试设置
我正在为 3 个目的地 14.143.156.x(在本例中为 101,102,103)使用 IXIA 流量生成器(使用两个 10G 链路生成 12Mpps)。每个目的地的流量来自 2^16 个不同的来源。这是流量生成设置。
代码段
struct flow_state_t {
struct rte_hash* hash;
uint32_t size;
uint64_t threshold;
};
struct flow_data_t {
uint8_t curr_state; // 0 if blocked, 1 if allowed
uint64_t pps_count;
uint64_t src_first_seen;
};
struct pipeline_ratelimit {
struct pipeline p;
struct pipeline_ratelimit_params params;
rte_table_hash_op_hash f_hash;
uint32_t swap_field0_offset[SWAP_DIM];
uint32_t swap_field1_offset[SWAP_DIM];
uint64_t swap_field_mask[SWAP_DIM];
uint32_t swap_n_fields;
pipeline_msg_req_handler custom_handlers[2]; // handlers for add and del
struct flow_state_t flow_state_arr[100];
struct flow_data_t flows[100][65536];
} __rte_cache_aligned;
/*
add_handler(pipeline,msg) -- msg includes index and threshold
In the add handler
a rule/ threshold is added for a destination
rte_hash_create and store rte_hash* in flow_state_arr[index]
max of 100 destinations or rules are allowed
previous pipelines add the ID (index) to the packet to look in to the
flow_state_arr for the rule
*/
/*
del_handler(pipeline,msg) -- msg includes index
In the del handler
a rule/ threshold @index is deleted
the associated rte_hash* is also freed
the slot is made free
*/
#define ALLOWED 1
#define BLOCKED 0
#define TABLE_MAX_CAPACITY 65536
int do_rate_limit(struct pipeline_ratelimit* ps, uint32_t id, unsigned char* pkt)
{
uint64_t curr_time_stamp = rte_get_timer_cycles();
struct iphdr* iph = (struct iphdr*)pkt;
uint32_t src_ip = rte_be_to_cpu_32(iph->saddr);
struct flow_state_t* node = &ps->flow_state_arr[id];
struct flow_data_t* flow = NULL
rte_hash_lookup_data(node->hash, &src_ip, (void**)&flow);
if (flow != NULL)
{
if (flow->curr_state == ALLOWED)
{
if (flow->pps_count++ > node->threshold)
{
uint64_t seconds_elapsed = (curr_time_stamp - flow->src_first_seen) / CYCLES_IN_1SEC;
if (seconds_elapsed)
{
flow->src_first_seen += seconds_elapsed * CYCLES_IN_1_SEC;
flow->pps_count = 1;
return ALLOWED;
}
else
{
flow->pps_count = 0;
flow->curr_state = BLOCKED;
return BLOCKED;
}
}
return ALLOWED;
}
else
{
uint64_t seconds_elapsed = (curr_time_stamp - flow->src_first_seen) / CYCLES_IN_1SEC;
if (seconds_elapsed > 120)
{
flow->curr_state = ALLOWED;
flow->pps_count = 0;
flow->src_first_seen += seconds_elapsed * CYCLES_IN_1_SEC;
return ALLOWED;
}
return BLOCKED;
}
}
int index = node->size;
// If entry not found and we have reached capacity
// Remove the rear element and mark it as the index for the new node
if (node->size == TABLE_MAX_CAPACITY)
{
rte_hash_reset(node->hash);
index = node->size = 0;
}
// Add new element @packet_flows[mit_id][index]
struct flow_data_t* flow_data = &ps->flows[id][index];
*flow_data = { ALLOWED, 1, curr_time_stamp };
node->size++;
// Add the new key to hash
rte_hash_add_key_data(node->hash, (void*)&src_ip, (void*)flow_data);
return ALLOWED;
}
static int pipeline_ratelimit_run(void* pipeline)
{
struct pipeline_ratelimit* ps = (struct pipeline_ratelimit*)pipeline;
struct rte_port_in* port_in = p->port_in_next;
struct rte_port_out* port_out = &p->ports_out[0];
struct rte_port_out* port_drop = &p->ports_out[2];
uint8_t valid_pkt_cnt = 0, invalid_pkt_cnt = 0;
struct rte_mbuf* valid_pkts[RTE_PORT_IN_BURST_SIZE_MAX];
struct rte_mbuf* invalid_pkts[RTE_PORT_IN_BURST_SIZE_MAX];
memset(valid_pkts, 0, sizeof(valid_pkts));
memset(invalid_pkts, 0, sizeof(invalid_pkts));
uint64_t n_pkts;
if (unlikely(port_in == NULL)) {
return 0;
}
/* Input port RX */
n_pkts = port_in->ops.f_rx(port_in->h_port, p->pkts,
port_in->burst_size);
if (n_pkts == 0)
{
p->port_in_next = port_in->next;
return 0;
}
uint32_t rc = 0;
char* rx_pkt = NULL;
for (j = 0; j < n_pkts; j++) {
struct rte_mbuf* m = p->pkts[j];
rx_pkt = rte_pktmbuf_mtod(m, char*);
uint32_t id = rte_be_to_cpu_32(*(uint32_t*)(rx_pkt - sizeof(uint32_t)));
unsigned short packet_len = rte_be_to_cpu_16(*((unsigned short*)(rx_pkt + 16)));
struct flow_state_t* node = &(ps->flow_state_arr[id]);
if (node->hash && node->threshold != 0)
{
// Decide whether to allow of drop the packet
// returns allow - 1, drop - 0
if (do_rate_limit(ps, id, (unsigned char*)(rx_pkt + 14)))
valid_pkts[valid_pkt_count++] = m;
else
invalid_pkts[invalid_pkt_count++] = m;
}
else
valid_pkts[valid_pkt_count++] = m;
if (invalid_pkt_cnt) {
p->pkts_mask = 0;
rte_memcpy(p->pkts, invalid_pkts, sizeof(invalid_pkts));
p->pkts_mask = RTE_LEN2MASK(invalid_pkt_cnt, uint64_t);
rte_pipeline_action_handler_port_bulk_mod(p, p->pkts_mask, port_drop);
}
p->pkts_mask = 0;
memset(p->pkts, 0, sizeof(p->pkts));
if (valid_pkt_cnt != 0)
{
rte_memcpy(p->pkts, valid_pkts, sizeof(valid_pkts));
p->pkts_mask = RTE_LEN2MASK(valid_pkt_cnt, uint64_t);
}
rte_pipeline_action_handler_port_bulk_mod(p, p->pkts_mask, port_out);
/* Pick candidate for next port IN to serve */
p->port_in_next = port_in->next;
return (int)n_pkts;
}
}
结果
- 当从 60000 个源中仅为一个目标生成流量且阈值为 14Mpps 时,没有丢包。我们能够从 IXIA 发送 12Mpps 并接收 12Mpps
- 添加 3 个或更多目的地(每个目的地配置为从 60000 个来源接收流量)后观察到掉线。吞吐量仅为 8-9 Mpps。当发送到 100 个目的地(每个目的地 60000 个 src)时,仅处理了 6.4Mpps。下降了 50%。
- 在 运行 上,通过 vtune-profiler,它报告 rte_hash_lookup_data 为热点并且主要是内存绑定(DRAM 绑定)。我会尽快附上vtune报告。
根据内部测试的更新,rte_hash
库不会导致性能下降。因此,正如评论中所建议的那样,更有可能是由于当前的模式和算法设计可能导致缓存未命中和每个周期的指令较少。
要确定是前端停顿还是后端管道停顿还是内存停顿,请使用 perf
或 vtune
。还要尽量减少分支并使用更多 likely
和 prefetch
。
编辑问题陈述以更多地突出核心逻辑
我们在进行基于目标的速率限制时遇到了性能问题。 我们为每个 {destination-src} 对(最多 100 个目的地和 2^16 个源)维护状态。我们有一个包含 100 个节点的数组,每个节点都有一个 rte_hash*。这个散列 table 将维护该目的地看到的每个源 ip 的状态。我们为每个看到的目的地(0 到 100)都有一个映射,这用于索引到数组中。如果特定来源在一秒钟内超过了为此目的地定义的阈值,我们将阻止该来源,否则我们将允许该来源。在运行时,当我们只看到 2 或 3 个目的地的流量时,没有问题,但当我们超过 5 个时,我们会看到很多丢弃。我们的函数必须进行查找并识别与 dest_ip 和 src_ip 匹配的流。处理流并决定它是否需要丢弃。如果找不到该流,则将其添加到哈希中。
struct flow_state {
struct rte_hash* hash;
};
struct flow_state flow_state_arr[100];
// 我将在 pipeline_init 使用 rte_hash_create 创建这些散列 tables 并在 pipeline_free.
期间释放它们我正在概述我们在伪代码中所做的事情。
run()
{
1) do rx
2) from the pkt, get index into the flow_state_arr and retrieve the rte_hash* handle
3) rte_hash_lookup_data(hash, src_ip,flow_data)
4) if entry found, take decision on the flow (the decision is simply say rate limiting the flow)
5) else rte_hash_add_data(hash,src_ip,new_flow_data) to add the flow to table and forward
}
请指导我们是否可以在数据路径中拥有这些多个哈希 table 对象,或者如果我们需要分别处理每个目的地的状态,最好的方法是什么。
编辑
谢谢回答。我很乐意分享代码片段和我们收集的结果。我没有其他 DPDK 版本的比较结果,但下面是我们使用 17.11.1.
测试设置
我正在为 3 个目的地 14.143.156.x(在本例中为 101,102,103)使用 IXIA 流量生成器(使用两个 10G 链路生成 12Mpps)。每个目的地的流量来自 2^16 个不同的来源。这是流量生成设置。
代码段
struct flow_state_t {
struct rte_hash* hash;
uint32_t size;
uint64_t threshold;
};
struct flow_data_t {
uint8_t curr_state; // 0 if blocked, 1 if allowed
uint64_t pps_count;
uint64_t src_first_seen;
};
struct pipeline_ratelimit {
struct pipeline p;
struct pipeline_ratelimit_params params;
rte_table_hash_op_hash f_hash;
uint32_t swap_field0_offset[SWAP_DIM];
uint32_t swap_field1_offset[SWAP_DIM];
uint64_t swap_field_mask[SWAP_DIM];
uint32_t swap_n_fields;
pipeline_msg_req_handler custom_handlers[2]; // handlers for add and del
struct flow_state_t flow_state_arr[100];
struct flow_data_t flows[100][65536];
} __rte_cache_aligned;
/*
add_handler(pipeline,msg) -- msg includes index and threshold
In the add handler
a rule/ threshold is added for a destination
rte_hash_create and store rte_hash* in flow_state_arr[index]
max of 100 destinations or rules are allowed
previous pipelines add the ID (index) to the packet to look in to the
flow_state_arr for the rule
*/
/*
del_handler(pipeline,msg) -- msg includes index
In the del handler
a rule/ threshold @index is deleted
the associated rte_hash* is also freed
the slot is made free
*/
#define ALLOWED 1
#define BLOCKED 0
#define TABLE_MAX_CAPACITY 65536
int do_rate_limit(struct pipeline_ratelimit* ps, uint32_t id, unsigned char* pkt)
{
uint64_t curr_time_stamp = rte_get_timer_cycles();
struct iphdr* iph = (struct iphdr*)pkt;
uint32_t src_ip = rte_be_to_cpu_32(iph->saddr);
struct flow_state_t* node = &ps->flow_state_arr[id];
struct flow_data_t* flow = NULL
rte_hash_lookup_data(node->hash, &src_ip, (void**)&flow);
if (flow != NULL)
{
if (flow->curr_state == ALLOWED)
{
if (flow->pps_count++ > node->threshold)
{
uint64_t seconds_elapsed = (curr_time_stamp - flow->src_first_seen) / CYCLES_IN_1SEC;
if (seconds_elapsed)
{
flow->src_first_seen += seconds_elapsed * CYCLES_IN_1_SEC;
flow->pps_count = 1;
return ALLOWED;
}
else
{
flow->pps_count = 0;
flow->curr_state = BLOCKED;
return BLOCKED;
}
}
return ALLOWED;
}
else
{
uint64_t seconds_elapsed = (curr_time_stamp - flow->src_first_seen) / CYCLES_IN_1SEC;
if (seconds_elapsed > 120)
{
flow->curr_state = ALLOWED;
flow->pps_count = 0;
flow->src_first_seen += seconds_elapsed * CYCLES_IN_1_SEC;
return ALLOWED;
}
return BLOCKED;
}
}
int index = node->size;
// If entry not found and we have reached capacity
// Remove the rear element and mark it as the index for the new node
if (node->size == TABLE_MAX_CAPACITY)
{
rte_hash_reset(node->hash);
index = node->size = 0;
}
// Add new element @packet_flows[mit_id][index]
struct flow_data_t* flow_data = &ps->flows[id][index];
*flow_data = { ALLOWED, 1, curr_time_stamp };
node->size++;
// Add the new key to hash
rte_hash_add_key_data(node->hash, (void*)&src_ip, (void*)flow_data);
return ALLOWED;
}
static int pipeline_ratelimit_run(void* pipeline)
{
struct pipeline_ratelimit* ps = (struct pipeline_ratelimit*)pipeline;
struct rte_port_in* port_in = p->port_in_next;
struct rte_port_out* port_out = &p->ports_out[0];
struct rte_port_out* port_drop = &p->ports_out[2];
uint8_t valid_pkt_cnt = 0, invalid_pkt_cnt = 0;
struct rte_mbuf* valid_pkts[RTE_PORT_IN_BURST_SIZE_MAX];
struct rte_mbuf* invalid_pkts[RTE_PORT_IN_BURST_SIZE_MAX];
memset(valid_pkts, 0, sizeof(valid_pkts));
memset(invalid_pkts, 0, sizeof(invalid_pkts));
uint64_t n_pkts;
if (unlikely(port_in == NULL)) {
return 0;
}
/* Input port RX */
n_pkts = port_in->ops.f_rx(port_in->h_port, p->pkts,
port_in->burst_size);
if (n_pkts == 0)
{
p->port_in_next = port_in->next;
return 0;
}
uint32_t rc = 0;
char* rx_pkt = NULL;
for (j = 0; j < n_pkts; j++) {
struct rte_mbuf* m = p->pkts[j];
rx_pkt = rte_pktmbuf_mtod(m, char*);
uint32_t id = rte_be_to_cpu_32(*(uint32_t*)(rx_pkt - sizeof(uint32_t)));
unsigned short packet_len = rte_be_to_cpu_16(*((unsigned short*)(rx_pkt + 16)));
struct flow_state_t* node = &(ps->flow_state_arr[id]);
if (node->hash && node->threshold != 0)
{
// Decide whether to allow of drop the packet
// returns allow - 1, drop - 0
if (do_rate_limit(ps, id, (unsigned char*)(rx_pkt + 14)))
valid_pkts[valid_pkt_count++] = m;
else
invalid_pkts[invalid_pkt_count++] = m;
}
else
valid_pkts[valid_pkt_count++] = m;
if (invalid_pkt_cnt) {
p->pkts_mask = 0;
rte_memcpy(p->pkts, invalid_pkts, sizeof(invalid_pkts));
p->pkts_mask = RTE_LEN2MASK(invalid_pkt_cnt, uint64_t);
rte_pipeline_action_handler_port_bulk_mod(p, p->pkts_mask, port_drop);
}
p->pkts_mask = 0;
memset(p->pkts, 0, sizeof(p->pkts));
if (valid_pkt_cnt != 0)
{
rte_memcpy(p->pkts, valid_pkts, sizeof(valid_pkts));
p->pkts_mask = RTE_LEN2MASK(valid_pkt_cnt, uint64_t);
}
rte_pipeline_action_handler_port_bulk_mod(p, p->pkts_mask, port_out);
/* Pick candidate for next port IN to serve */
p->port_in_next = port_in->next;
return (int)n_pkts;
}
}
结果
- 当从 60000 个源中仅为一个目标生成流量且阈值为 14Mpps 时,没有丢包。我们能够从 IXIA 发送 12Mpps 并接收 12Mpps
- 添加 3 个或更多目的地(每个目的地配置为从 60000 个来源接收流量)后观察到掉线。吞吐量仅为 8-9 Mpps。当发送到 100 个目的地(每个目的地 60000 个 src)时,仅处理了 6.4Mpps。下降了 50%。
- 在 运行 上,通过 vtune-profiler,它报告 rte_hash_lookup_data 为热点并且主要是内存绑定(DRAM 绑定)。我会尽快附上vtune报告。
根据内部测试的更新,rte_hash
库不会导致性能下降。因此,正如评论中所建议的那样,更有可能是由于当前的模式和算法设计可能导致缓存未命中和每个周期的指令较少。
要确定是前端停顿还是后端管道停顿还是内存停顿,请使用 perf
或 vtune
。还要尽量减少分支并使用更多 likely
和 prefetch
。