Kafka KStream-KTable加入竞争条件
Kafka KStream-KTable join race condition
我有以下内容:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
streamB 中的消息需要使用 tableA 中的数据进行充实。
示例数据:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
在完美的世界里,我愿意做
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
不幸的是,这对我不起作用,因为我的数据是每次将消息写入主题 A 时,相应的消息也会写入主题 B(源是单个数据库事务)。现在,在这个初始 'creation' 事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上出现几个事件,但对于给定的键,也可能会出现相隔数小时的连续事件。
简单解决方案不起作用的原因是原始 'creation' 事务导致竞争条件:主题 A 和 B 几乎同时收到消息,如果 B 消息到达 'join' 部分首先是拓扑结构(比如在 A 消息到达那里之前的几毫秒),tableA 还不包含相应的条目。此时事件丢失。我可以在主题 C 上看到这种情况:一些事件出现,一些事件不出现(如果我使用 leftJoin,所有事件都会出现,但有些事件具有 null 键,这相当于丢失)。这只是初始 'creation' 交易的问题。之后每有一个事件到达topic B,相应的entry就会存在于tableA中。
所以我的问题是:你如何解决这个问题?
我目前的解决方案很丑陋。我所做的是创建了一个 'collection of B' 并使用
阅读了主题 B
B.groupByKey()
.aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
.join(tableA, ...);
现在我们有一个 KTable-KTable 连接,它不受此竞争条件的影响。我考虑这个 'ugly' 的原因是因为在每次加入后,我必须向主题 B 发送一条特殊消息,该消息本质上是 "remove the event(s) that I just processed from the collection"。如果此特殊消息未发送到主题 B,集合将继续增长,集合中的每个事件都将在每次加入时报告。
目前我正在研究 window 连接是否有效(将 A 和 B 读入 KStreams 并使用 windowed 连接)。我不确定这是否有效,因为 window 的大小没有上限。我想说,"window starts 1 second 'before' and ends infinity seconds 'after'"。即使我能以某种方式完成这项工作,我还是有点担心 space 具有无界 window.
的要求
如有任何建议,我们将不胜感激。
不确定您使用的是什么版本,但最新的 Kafka 2.1 改进了流-table-连接。即使在 2.1 之前,以下内容也成立:
- stream-table 加入基于事件时间
- Kafka Streams 基于事件时间处理消息,但是,以偏移顺序(对于两个输入流,首先处理记录时间戳较小的流)
- 如果要确保table先更新,table更新记录的时间戳应该小于流记录
从 2.1 开始:
- 为了允许一些延迟,您可以配置
max.task.idle.ms
配置以延迟处理只有一个输入主题有输入数据的情况
事件时间处理顺序在 2.0 和更早版本中以尽力而为的方式实现,这可能会导致您描述的竞争条件。在 2.1 中,处理顺序是有保证的,只有在 max.task.idle.ms
命中时才可能被违反。
我有以下内容:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
streamB 中的消息需要使用 tableA 中的数据进行充实。
示例数据:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
在完美的世界里,我愿意做
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
不幸的是,这对我不起作用,因为我的数据是每次将消息写入主题 A 时,相应的消息也会写入主题 B(源是单个数据库事务)。现在,在这个初始 'creation' 事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上出现几个事件,但对于给定的键,也可能会出现相隔数小时的连续事件。
简单解决方案不起作用的原因是原始 'creation' 事务导致竞争条件:主题 A 和 B 几乎同时收到消息,如果 B 消息到达 'join' 部分首先是拓扑结构(比如在 A 消息到达那里之前的几毫秒),tableA 还不包含相应的条目。此时事件丢失。我可以在主题 C 上看到这种情况:一些事件出现,一些事件不出现(如果我使用 leftJoin,所有事件都会出现,但有些事件具有 null 键,这相当于丢失)。这只是初始 'creation' 交易的问题。之后每有一个事件到达topic B,相应的entry就会存在于tableA中。
所以我的问题是:你如何解决这个问题?
我目前的解决方案很丑陋。我所做的是创建了一个 'collection of B' 并使用
阅读了主题 BB.groupByKey()
.aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
.join(tableA, ...);
现在我们有一个 KTable-KTable 连接,它不受此竞争条件的影响。我考虑这个 'ugly' 的原因是因为在每次加入后,我必须向主题 B 发送一条特殊消息,该消息本质上是 "remove the event(s) that I just processed from the collection"。如果此特殊消息未发送到主题 B,集合将继续增长,集合中的每个事件都将在每次加入时报告。
目前我正在研究 window 连接是否有效(将 A 和 B 读入 KStreams 并使用 windowed 连接)。我不确定这是否有效,因为 window 的大小没有上限。我想说,"window starts 1 second 'before' and ends infinity seconds 'after'"。即使我能以某种方式完成这项工作,我还是有点担心 space 具有无界 window.
的要求如有任何建议,我们将不胜感激。
不确定您使用的是什么版本,但最新的 Kafka 2.1 改进了流-table-连接。即使在 2.1 之前,以下内容也成立:
- stream-table 加入基于事件时间
- Kafka Streams 基于事件时间处理消息,但是,以偏移顺序(对于两个输入流,首先处理记录时间戳较小的流)
- 如果要确保table先更新,table更新记录的时间戳应该小于流记录
从 2.1 开始:
- 为了允许一些延迟,您可以配置
max.task.idle.ms
配置以延迟处理只有一个输入主题有输入数据的情况
事件时间处理顺序在 2.0 和更早版本中以尽力而为的方式实现,这可能会导致您描述的竞争条件。在 2.1 中,处理顺序是有保证的,只有在 max.task.idle.ms
命中时才可能被违反。