Kafka Streams中如何发现并过滤掉重复记录
How to discover and filter out duplicate records in Kafka Streams
假设您有一个包含空键的主题,值为
{id:1, name:Chris, age:99}
假设您想按姓名统计人数。你会做如下的事情:
nameStream.groupBy((key,value) -> value.getName())
.count();
现在让我们说它是有效的,你可以得到重复的记录,你可以根据 id 判断它是重复的。
例如:
{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}
结果应该是 1 并且
{id:1, name:Chris, age:99}
{id:2, name:Chris, age:xx}
结果应该是 2。
你会如何做到这一点?我认为 reduce 会起作用,但误解了它是如何起作用的。
您可以使用多个属性进行分组。通过串联创建自定义密钥并作为密钥传递:
KTable<String,String> modifiedTable = nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);
上面的 KTable 将给出具有给定名称和 ID 的任何记录的更新状态。
所以对于{id:1,name:Chris.....}
,它将在KTable中只有一条记录:
而在下面的情况下,两条记录都会出现:
<Chris1, {id:1, name:Chris, age:99}>
<Chris2, {id:2, name:Chris, age:xx}>
现在您想使用名称属性进行计数操作。因此,将键更改为 name 并重新分组 table 并执行 count()。
KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();
这里count()会在KTable之上执行。 KTable 是任何给定 ID 的更新视图。
因此,对于以下输入,modifiedTable 一次将有 1 条记录作为键 "Chris1" 的更新值,您将得到 count=>1
<Chris,1> // Here key will be Chris1
以下输入将导致 **count=>2
{id:1, name:Chris, age:99} // Here key was be Chris1
{id:2, name:Chris, age:xx} // Here key was be Chris2
假设您有一个包含空键的主题,值为
{id:1, name:Chris, age:99}
假设您想按姓名统计人数。你会做如下的事情:
nameStream.groupBy((key,value) -> value.getName())
.count();
现在让我们说它是有效的,你可以得到重复的记录,你可以根据 id 判断它是重复的。
例如:
{id:1, name:Chris, age:99}
{id:1, name:Chris, age:xx}
结果应该是 1 并且
{id:1, name:Chris, age:99}
{id:2, name:Chris, age:xx}
结果应该是 2。
你会如何做到这一点?我认为 reduce 会起作用,但误解了它是如何起作用的。
您可以使用多个属性进行分组。通过串联创建自定义密钥并作为密钥传递:
KTable<String,String> modifiedTable = nameStream.groupBy((key,value) -> value.getName()+value.getId()).reduce((aggVal,newval) -> aggVal);
上面的 KTable 将给出具有给定名称和 ID 的任何记录的更新状态。
所以对于{id:1,name:Chris.....}
,它将在KTable中只有一条记录:
而在下面的情况下,两条记录都会出现:
<Chris1, {id:1, name:Chris, age:99}>
<Chris2, {id:2, name:Chris, age:xx}>
现在您想使用名称属性进行计数操作。因此,将键更改为 name 并重新分组 table 并执行 count()。
KTable countTable = modifiedTable.groupBy((k,v)-> KeyValue.pair(v.getName(), v)).count();
这里count()会在KTable之上执行。 KTable 是任何给定 ID 的更新视图。
因此,对于以下输入,modifiedTable 一次将有 1 条记录作为键 "Chris1" 的更新值,您将得到 count=>1
<Chris,1> // Here key will be Chris1
以下输入将导致 **count=>2
{id:1, name:Chris, age:99} // Here key was be Chris1
{id:2, name:Chris, age:xx} // Here key was be Chris2