为什么我的 Kafka Streams 拓扑 replay/reprocess 不正确?

Why does my Kafka Streams topology does not replay/reprocess correctly?

我的拓扑结构如下所示:

KTable<ByteString, User> users = topology.table(USERS);

KStream<ByteString, JoinRequest> joinRequests = topology.stream(JOIN_REQUESTS)
    .mapValues(entityTopologyProcessor::userNew)
    .to(USERS);

topology.stream(SETTINGS_CONFIRM_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsConfirm)
    .to(USERS);

topology.stream(SETTINGS_UPDATE_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsUpdate)
    .to(USERS);

此拓扑在运行时运行良好。用户是通过加入请求创建的。他们通过设置确认请求确认他们的设置。他们通过设置更新请求更新他们的设置。

但是,重新处理此拓扑不会产生原始结果。具体来说,设置更新加入者看不到由设置确认加入者产生的用户,即使就时间戳而言,从创建用户到确认用户到用户更新时间已经过去了很多秒他们的设置。

我很茫然。我已经尝试关闭用户 table 的 caching/logging。不知道该怎么做才能正确地重新处理。

KStream-KTable 连接不是 100% 确定的(并且可能永远不会成为 100% 确定的)。我们知道这个问题并讨论解决方案,至少可以缓解这个问题。

一个问题是,如果消费者从代理中获取数据,我们无法轻松控制and/or 为哪些主题对代理returns 数据进行分区。根据我们从代理接收数据的顺序,结果可能会略有不同。

一个相关问题:https://issues.apache.org/jira/browse/KAFKA-3514

此博客 post 也可能有帮助:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

通过将问题中的代码替换为:

,我能够部分解决我的问题
KTable<ByteString, User> users = topology.table(JOIN_REQUESTS)
    .mapValue(entityTopologyProcessor::user)
    .leftJoin(topology
                 .stream(CONFIRM_SETTINGS_REQUESTS)
                 .groupByKey()
                 .reduce((a, b) -> b),
              entityTopologyProcessor::confirmSettings)
    .leftJoin(topology
                 .stream(SETTINGS_UPDATE_REQUESTS)
                 .groupByKey()
                 .reduce(entityTopologyProcessor::settingsUpdateReduce),
              entityTopologyProcessor::settingsUpdate);

此解决方案利用了所有 table-table 连接都是确定性的这一事实。在重新处理过程中,结果状态可能暂时不正确,但一旦拓扑被赶上,最终值就是正确的(给定结果的最终时间戳仍然不确定)。一般来说,这种方法将给定实体(在本例中:用户)的所有事件(在本例中:加入请求、确认设置请求、设置更新请求)分组到一个任务中,并将它们的累积加入到一个产品中.通过在末尾加入另一个流使结果为空,可以使用删除事件扩展此示例。

除此方法外,通常,编写可重新处理的拓扑需要从两个维度考虑拓扑:实时和重新处理时间。从 Kafka Streams 1.0.0 开始,这对开发人员来说是一门艺术。