kafka-streams 加入产生重复项

kafka-streams join produce duplicates

我有两个主题:

// photos
{'id': 1, 'user_id': 1, 'url': 'url#1'},
{'id': 2, 'user_id': 2, 'url': 'url#2'},
{'id': 3, 'user_id': 2, 'url': 'url#3'}

// users
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'}

我根据用户创建地图照片

KStream<Integer, Photo> photo_by_user = ...

photo_by_user.to("photo_by_user")

然后,我尝试连接两个表:

KTable<Integer, User> users_table = builder.table("users");
KTable<Integer, Photo> photo_by_user_table = builder.table("photo_by_user");
KStream<Integer, Result> results = users_table.join(photo_by_user_table, (a, b) -> Result.from(a, b)).toStream();

results.to("results");

结果像

{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'}
{'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}
{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'}
{'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}

我看到结果重复了。为什么,以及如何避免它?

您可能遇到了已知错误。在 "flush" KTable-KTable join 上可能会产生一些重复项。请注意,这些重复项严格来说并不是不正确的,因为结果是更新流并且将 "A" 更新为 "A" 不会更改结果。获得这些副本当然是不希望的。尝试禁用缓存——没有缓存,"flush issues" 不应该出现。