这个特定的 Kafka Streams 拓扑是否引入了竞争条件?
Does this particular Kafka Streams topology introduce a race condition?
涉及两个主题:commands
那是一个 KStream 的 - 你不会相信! - 命令,和 state
,即 KTABLE(只是普通的,不是 GlobalKTable)。
拓扑看起来像这样:
commands.leftJoin(state, computeNewState).to(state)
即命令作用于当前状态并针对同一主题产生新状态。就功能性编程而言,它有点 command X state -> state
;其中 final state
产生到与初始状态相同的地方。
在我看来,经典的竞争条件隐藏在那里;因为两个(几乎)同时执行的命令可能会产生以下不幸的序列:
command_1
到达并消耗 state_1
;
- 重新计算后,
state_2
是通过应用command_1
产生的;
state_2
反应 to
节点并有效地将 IO 异步到 Kafka 发生...
- ...但还不够快,无法应用;同时
command_2
带有相同的密钥,因此 leftJoin
作用于 state_1
而不是 state_2
只是因为 state_2
尚未交付给 Kafka 并且尚未看到通过 Kafka Streams 实例;
- QED.
我说得对吗?
您描述的是正确的。
也许您可以只使用单个输入主题并使用聚合来修改状态?对于这种情况,状态更新将是同步的。
如果那不可能,我建议退回到处理器 API。您将状态主题读入手动添加的状态存储中。您还将状态存储连接到处理评论主题的处理器——这样,进程可以在直接处理命令时读取 和修改 状态——将任何内容写回状态需要输入主题。
涉及两个主题:commands
那是一个 KStream 的 - 你不会相信! - 命令,和 state
,即 KTABLE(只是普通的,不是 GlobalKTable)。
拓扑看起来像这样:
commands.leftJoin(state, computeNewState).to(state)
即命令作用于当前状态并针对同一主题产生新状态。就功能性编程而言,它有点 command X state -> state
;其中 final state
产生到与初始状态相同的地方。
在我看来,经典的竞争条件隐藏在那里;因为两个(几乎)同时执行的命令可能会产生以下不幸的序列:
command_1
到达并消耗state_1
;- 重新计算后,
state_2
是通过应用command_1
产生的; state_2
反应to
节点并有效地将 IO 异步到 Kafka 发生...- ...但还不够快,无法应用;同时
command_2
带有相同的密钥,因此leftJoin
作用于state_1
而不是state_2
只是因为state_2
尚未交付给 Kafka 并且尚未看到通过 Kafka Streams 实例; - QED.
我说得对吗?
您描述的是正确的。
也许您可以只使用单个输入主题并使用聚合来修改状态?对于这种情况,状态更新将是同步的。
如果那不可能,我建议退回到处理器 API。您将状态主题读入手动添加的状态存储中。您还将状态存储连接到处理评论主题的处理器——这样,进程可以在直接处理命令时读取 和修改 状态——将任何内容写回状态需要输入主题。