Kafka Streams中如何发现并过滤掉重复记录

How to discover and filter out duplicate records in Kafka Streams

假设您有一个包含空键的主题,值为

{id:1, name:Chris, age:99}

假设您想按姓名统计人数。你会做如下的事情:

nameStream.groupBy((key,value) -> value.getName())
           .count();

现在让我们说它是有效的,你可以得到重复的记录,你可以根据 id 判断它是重复的。

例如:

{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}

结果应该是 1 并且

   {id:1, name:Chris, age:99}
   {id:2, name:Chris, age:xx}

结果应该是 2。

你会如何做到这一点?我认为 reduce 会起作用,但误解了它是如何起作用的。

您可以使用多个属性进行分组。通过串联创建自定义密钥并作为密钥传递:

KTable<String,String> modifiedTable =  nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);

上面的 KTable 将给出具有给定名称和 ID 的任何记录的更新状态。 所以对于{id:1,name:Chris.....},它将在KTable中只有一条记录:

而在下面的情况下,两条记录都会出现:

<Chris1,  {id:1, name:Chris, age:99}> 
<Chris2,   {id:2, name:Chris, age:xx}> 

现在您想使用名称属性进行计数操作。因此,将键更改为 name 并重新分组 table 并执行 count()。

KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();

这里count()会在KTable之上执行。 KTable 是任何给定 ID 的更新视图。
因此,对于以下输入,modifiedTable 一次将有 1 条记录作为键 "Chris1" 的更新值,您将得到 count=>1

<Chris,1> // Here key will be Chris1

以下输入将导致 **count=>2

{id:1, name:Chris, age:99}  // Here key was be Chris1
{id:2, name:Chris, age:xx}  // Here key was be Chris2