是否支持具有相同接收器和源主题并加入的 Kafka Stream?
Does Kafka Stream with same sink & source topics with join is supported?
我有一个复杂的 Kafka Stream 应用程序,在同一个流中有 2 个完全有状态的流:
- 它使用
Execution
主题作为来源,增强了消息并重新发布回相同的 Execution
主题。
- 它加入另一个主题
WorkerTaskResult
,将结果添加到 Execution
并发布回 Execution
主题。
主要目标是提供一个工作流系统。
详细逻辑是:
- 一个Execution is a list of TaskRun
Execution
查看所有TaskRun
的当前状态并找到下一个执行
- 如果找到,执行会改变它们的
TaskRunsList
并添加下一个并发布回 Kafka,它还会将要完成的任务发送到另一个队列 (WorkerTask
)
WorkerTask
在 Kafka 流之外进行并使用简单的 Kafka 消费者和生产者发布回另一个队列(WorkerTaskResult
)
-
WorkerTaskResult
在当前 Execution
中更改当前 TaskRun
并更改状态(主要是 运行 / SUCCEED / FAILED)和 也 发布回 Execution
队列(使用 Kafka Stream)
如您所见,Execution
(带有 TaskRun
列表)是当前应用程序的状态。
当所有消息都是顺序的(没有并发性,我只能同时更改 TaskRun
列表)时,流工作得很好。当工作流变成并行时(concurrent WorkerTaskResult
can be join),似乎我的执行状态被覆盖并产生了一种回滚。
示例日志输出:
2020-04-20 08:05:44,830 INFO reamThread-1 afkaExecutor Stream in with 3264792750: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created
]
)
2020-04-20 08:05:44,881 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO reamThread-1 afkaExecutor Stream out with 1805535461 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
]
)
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor Stream out with 578845055 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
]
)
2020-04-20 08:05:45,153 INFO reamThread-1 afkaExecutor Stream in with 1805535461: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
]
)
2020-04-20 08:05:45,157 INFO reamThread-1 afkaExecutor Stream out with 1889889916 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,209 WARN reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO reamThread-1 afkaExecutor Stream in with 1889889916: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor Stream out with 3651399223 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
]
)
我在控制台上收到一些警告消息 Detected out-of-order KTable update for execution at offset 10, partition 7.
可以找到完整的源代码 here。
如果也尝试许多不同的方法,比如这个 here :
- 将
Execution
和 WorkerTaskResult
放在同一个主题上,以确保同时处理相同的只有 1 条消息
- 并自己将最后一个
Execution
保存在 StateStore 中(以便加入 WorkerTaskResult
& Execution
)
- 但听起来我重新发明了一个 KTable,但效果并不好
或这个 here :
- 大部分与之前相同(我自己将最后一个
Execution
保存在 StateStore 上)
- 但使用 2 KStream 到 KStream(删除 KTable)。
我的问题是:
- KafkaStreams 是否支持这种模式(这不是 dag 流,因为我们沉迷于同一主题)?
- 将此流设计为并发安全的好方法是什么?
非常感谢任何线索,几天以来完全卡住了,谢谢
编辑 1:
这是一些附加信息:
- 只有 KStream 应用程序向
Execution
发布新事件,没有外部应用程序发布此主题,外部应用程序发布到执行的唯一情况是第一个事件(也称为执行的创建) .
- 有一个
WorkerApp
(外部应用程序,简单的消费者/生产者)从 WorkerTask
(待完成的工作)消费并在 WorkerTaskResult
上发布结果(主要是当前状态应用程序)。
这是实际流的简化版本:
Builder
-> Stream 1
- from KStream<WorkerTaskResult>
- join KTable<Execution>
- to Execution topic
-> Stream 2
- from KTable<Execution> (same than previous)
- multiple output
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
KStream 主要是一个 Executor State 应用程序,它可以找到下一步 WorkerTask
要做的事情并评估流程是否结束,因此该应用程序可以:
- 新建
TaskRun
- 改变当前状态
TaskRun
- 加入
WorkerTaskResult
或
- 评估整个执行并发现任务失败(基于依赖关系)
- 更改执行状态并发布最终状态 SUCCEED 或 FAILED,这将打破 "infinite loop"
在这个实际版本中,我真正不清楚的是 Detected out-of-order KTable update
在现实世界中的含义是什么?
这是否意味着 KTable 每个分区和每个键必须有一个生产者才能保持主题的顺序?
编辑 2:
与此同时,我发现了一种新的方式来思考似乎正在运行的流应用程序。单元测试正在通过,不再 Detected out-of-order
。
这是简化的新流程:
Builder
- from KTable<Execution>
- leftJoin KTable<WorkerTaskResult>
- Branch
- If Join > to Execution topic
- If not joint > continue the flow
- Multiple output (same than previous)
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
我认为有道理的是:
WorkerTaskResult
现在是一个KTable,所以我只保留最后一版的结果
- 我有一个输出到
Execution
的单一路径流(不再有 2 个路径)(我认为这是 最重要的 部分解决了 -顺序)
- 整体似乎每个输入只有一个输出(
Execution
上的 1 个新值将在 Execution
主题上产生 1 个新值)
这是新拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
--> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
<-- KSTREAM-SOURCE-0000000000
Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
--> KTABLE-SOURCE-0000000005
Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
--> KTABLE-JOINOTHER-0000000008
<-- KSTREAM-SOURCE-0000000004
Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000005
Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000001
Processor: KTABLE-MERGE-0000000006 (stores: [])
--> KTABLE-TOSTREAM-0000000009
<-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
--> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
<-- KTABLE-MERGE-0000000006
Processor: KSTREAM-FILTER-0000000015 (stores: [])
--> KSTREAM-MAPVALUES-0000000016
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
--> KSTREAM-MAPVALUES-0000000017
<-- KSTREAM-FILTER-0000000015
Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
--> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
<-- KSTREAM-MAPVALUES-0000000016
Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000042 (stores: [])
--> KSTREAM-MAPVALUES-0000000043
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000030 (stores: [])
--> KSTREAM-MAPVALUES-0000000031
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000055 (stores: [])
--> KSTREAM-MAPVALUES-0000000056
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
--> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
<-- KSTREAM-FILTER-0000000042
Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
--> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
<-- KSTREAM-FILTER-0000000030
Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
--> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
<-- KSTREAM-FILTER-0000000055
Processor: KSTREAM-FILTER-0000000024 (stores: [])
--> KSTREAM-MAPVALUES-0000000025
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000032 (stores: [])
--> KSTREAM-MAPVALUES-0000000033
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000044 (stores: [])
--> KSTREAM-MAPVALUES-0000000045
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-FILTER-0000000057 (stores: [])
--> KSTREAM-MAPVALUES-0000000058
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-FILTER-0000000010 (stores: [])
--> KSTREAM-MAPVALUES-0000000011
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-FILTER-0000000019 (stores: [])
--> KSTREAM-MAPVALUES-0000000020
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000050 (stores: [])
--> KSTREAM-MAPVALUES-0000000051
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
--> KSTREAM-FILTER-0000000026
<-- KSTREAM-FILTER-0000000024
Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
--> KSTREAM-MAPVALUES-0000000034
<-- KSTREAM-FILTER-0000000032
Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
--> KSTREAM-MAPVALUES-0000000046
<-- KSTREAM-FILTER-0000000044
Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
--> KSTREAM-MAPVALUES-0000000059
<-- KSTREAM-FILTER-0000000057
Processor: KSTREAM-FILTER-0000000026 (stores: [])
--> KSTREAM-FILTER-0000000027
<-- KSTREAM-MAPVALUES-0000000025
Processor: KSTREAM-FILTER-0000000038 (stores: [])
--> KSTREAM-MAPVALUES-0000000039
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000063 (stores: [])
--> KSTREAM-MAPVALUES-0000000064
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
--> KSTREAM-FILTER-0000000012
<-- KSTREAM-FILTER-0000000010
Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-FILTER-0000000019
Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
--> KSTREAM-FILTER-0000000035
<-- KSTREAM-MAPVALUES-0000000033
Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
--> KSTREAM-FILTER-0000000047
<-- KSTREAM-MAPVALUES-0000000045
Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
--> KSTREAM-FILTER-0000000052
<-- KSTREAM-FILTER-0000000050
Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
--> KSTREAM-FILTER-0000000060
<-- KSTREAM-MAPVALUES-0000000058
Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
--> KSTREAM-FILTER-0000000068
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000012 (stores: [])
--> KSTREAM-PEEK-0000000013
<-- KSTREAM-MAPVALUES-0000000011
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-PEEK-0000000022
<-- KSTREAM-MAPVALUES-0000000020
Processor: KSTREAM-FILTER-0000000027 (stores: [])
--> KSTREAM-PEEK-0000000028
<-- KSTREAM-FILTER-0000000026
Processor: KSTREAM-FILTER-0000000035 (stores: [])
--> KSTREAM-PEEK-0000000036
<-- KSTREAM-MAPVALUES-0000000034
Processor: KSTREAM-FILTER-0000000047 (stores: [])
--> KSTREAM-PEEK-0000000048
<-- KSTREAM-MAPVALUES-0000000046
Processor: KSTREAM-FILTER-0000000052 (stores: [])
--> KSTREAM-PEEK-0000000053
<-- KSTREAM-MAPVALUES-0000000051
Processor: KSTREAM-FILTER-0000000060 (stores: [])
--> KSTREAM-PEEK-0000000061
<-- KSTREAM-MAPVALUES-0000000059
Processor: KSTREAM-FILTER-0000000068 (stores: [])
--> KSTREAM-PEEK-0000000069
<-- KSTREAM-MAPVALUES-0000000067
Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
--> KSTREAM-FILTER-0000000040
<-- KSTREAM-FILTER-0000000038
Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
--> KSTREAM-TRANSFORM-0000000065
<-- KSTREAM-FILTER-0000000063
Processor: KSTREAM-FILTER-0000000040 (stores: [])
--> KSTREAM-SINK-0000000041
<-- KSTREAM-MAPVALUES-0000000039
Processor: KSTREAM-PEEK-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
<-- KSTREAM-FILTER-0000000012
Processor: KSTREAM-PEEK-0000000022 (stores: [])
--> KSTREAM-SINK-0000000023
<-- KSTREAM-FILTER-0000000021
Processor: KSTREAM-PEEK-0000000028 (stores: [])
--> KSTREAM-SINK-0000000029
<-- KSTREAM-FILTER-0000000027
Processor: KSTREAM-PEEK-0000000036 (stores: [])
--> KSTREAM-SINK-0000000037
<-- KSTREAM-FILTER-0000000035
Processor: KSTREAM-PEEK-0000000048 (stores: [])
--> KSTREAM-SINK-0000000049
<-- KSTREAM-FILTER-0000000047
Processor: KSTREAM-PEEK-0000000053 (stores: [])
--> KSTREAM-SINK-0000000054
<-- KSTREAM-FILTER-0000000052
Processor: KSTREAM-PEEK-0000000061 (stores: [])
--> KSTREAM-SINK-0000000062
<-- KSTREAM-FILTER-0000000060
Processor: KSTREAM-PEEK-0000000069 (stores: [])
--> KSTREAM-SINK-0000000070
<-- KSTREAM-FILTER-0000000068
Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
--> KSTREAM-SINK-0000000066
<-- KSTREAM-MAPVALUES-0000000064
Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
--> log-executionStream
<-- KTABLE-SOURCE-0000000001
Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000013
Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000022
Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000028
Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000036
Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
<-- KSTREAM-FILTER-0000000040
Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000048
Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000053
Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000061
Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
<-- KSTREAM-TRANSFORM-0000000065
Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000069
Processor: log-executionStream (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000002
目前,我不清楚该解决方案是否对任何并发具有弹性,以及我是否可以再次遇到乱序(这意味着执行在之前的时间回滚并且导致多次执行同一任务)。
is this pattern (that is not a dag flow as we sink on the same topic) are supported by KafkaStreams ?
总的来说是的。您只需要确保您不会以 "infinite loop" 结束,即在某个时候输入记录应该 "terminate" 并且不再对输出主题产生任何内容。对于您的情况,Execution
最终不应再创建新的 Tasks
(通过反馈循环)。
what is the good way to design this stream to be concurrency safe
它总是取决于具体的应用......对于你的情况,如果我正确理解你的应用程序的设计,你基本上有两个输入主题(Execution
和WorkerTaskResult
)和两个输出主题(Execution
和 WorkerTask
)。在处理输入主题时,来自每个输入的消息可能会修改共享状态(即任务的状态)。
此外,还有一个"outside application"从WorkerTask
主题读取并写入WorkerTaskResult
主题?因此,您的整体数据流中实际上存在第二个循环?我假设还有其他上游应用程序实际上也会将新数据推送到 Execution
主题?
+-----------------+
| |
v |
upstream producers ---> "Execution" --+ |
| |
v |
KS-App --+
^ |
| |
+--> "WorkerTaskResult" --+ +--> "WorkerTask" --+
| |
+------------------------ outside app <----------------+
我在 atm 上不清楚的地方:
- 哪些状态更改从 KS-App 直接传播回
Execution
?
- 哪些状态变化是从 "outside app" 通过
WorkerTaskResult
传播的?
也许你可以更新你的问题,我可以尝试相应地更新我的答案。
更新(基于编辑 1 和 2)
to Execution & to WorkerTask topic (if found a next task)
这一步好像引入了竞争条件?当回写到 Execution
主题时,您会在回读时更新状态。并行地,任务的执行可能首先完成(即,在 Execution
更新被重新读取和处理之前),因此我们可以写入第二个 Execution
更新(当任务完成时)以更新国家第一?
On this actual version, what is really unclear for me is what is the meaning of Detected out-of-order KTable update in a real world? Does this mean that a KTable must have a single producer per partition and per key in order to keep order on the topic ?
可以这么说。对于每个输入记录,table()
运算符将输入的时间戳与 table 中当前条目的时间戳进行比较。如果输入记录的时间戳更小,则会记录 WARN(仍会应用更新):WARN 的原因是 table 每个键仅存储一个条目,而 table 期望只能及时前进。如果存在乱序更新,这可能会导致意外结果,从而导致 WARN 日志。每个分区使用单个生产者或每个密钥使用单个生产者将避免每个密钥的数据乱序(假设生产者只发送有序数据)。
如果我完全理解您的应用程序的新版本,我不是 100% 确定 atm,但总的来说,您希望确保避免数据竞争,并将更新线性化到 Execution
。
我有一个复杂的 Kafka Stream 应用程序,在同一个流中有 2 个完全有状态的流:
- 它使用
Execution
主题作为来源,增强了消息并重新发布回相同的Execution
主题。 - 它加入另一个主题
WorkerTaskResult
,将结果添加到Execution
并发布回Execution
主题。
主要目标是提供一个工作流系统。
详细逻辑是:
- 一个Execution is a list of TaskRun
Execution
查看所有TaskRun
的当前状态并找到下一个执行- 如果找到,执行会改变它们的
TaskRunsList
并添加下一个并发布回 Kafka,它还会将要完成的任务发送到另一个队列 (WorkerTask
) WorkerTask
在 Kafka 流之外进行并使用简单的 Kafka 消费者和生产者发布回另一个队列(WorkerTaskResult
)-
WorkerTaskResult
在当前Execution
中更改当前TaskRun
并更改状态(主要是 运行 / SUCCEED / FAILED)和 也 发布回Execution
队列(使用 Kafka Stream)
如您所见,Execution
(带有 TaskRun
列表)是当前应用程序的状态。
当所有消息都是顺序的(没有并发性,我只能同时更改 TaskRun
列表)时,流工作得很好。当工作流变成并行时(concurrent WorkerTaskResult
can be join),似乎我的执行状态被覆盖并产生了一种回滚。
示例日志输出:
2020-04-20 08:05:44,830 INFO reamThread-1 afkaExecutor Stream in with 3264792750: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created
]
)
2020-04-20 08:05:44,881 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO reamThread-1 afkaExecutor Stream out with 1805535461 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
]
)
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor Stream out with 578845055 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
]
)
2020-04-20 08:05:45,153 INFO reamThread-1 afkaExecutor Stream in with 1805535461: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
]
)
2020-04-20 08:05:45,157 INFO reamThread-1 afkaExecutor Stream out with 1889889916 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,209 WARN reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO reamThread-1 afkaExecutor Stream in with 1889889916: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor Stream out with 3651399223 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
]
)
我在控制台上收到一些警告消息 Detected out-of-order KTable update for execution at offset 10, partition 7.
可以找到完整的源代码 here。
如果也尝试许多不同的方法,比如这个 here :
- 将
Execution
和WorkerTaskResult
放在同一个主题上,以确保同时处理相同的只有 1 条消息 - 并自己将最后一个
Execution
保存在 StateStore 中(以便加入WorkerTaskResult
&Execution
) - 但听起来我重新发明了一个 KTable,但效果并不好
或这个 here :
- 大部分与之前相同(我自己将最后一个
Execution
保存在 StateStore 上) - 但使用 2 KStream 到 KStream(删除 KTable)。
我的问题是:
- KafkaStreams 是否支持这种模式(这不是 dag 流,因为我们沉迷于同一主题)?
- 将此流设计为并发安全的好方法是什么?
非常感谢任何线索,几天以来完全卡住了,谢谢
编辑 1:
这是一些附加信息:
- 只有 KStream 应用程序向
Execution
发布新事件,没有外部应用程序发布此主题,外部应用程序发布到执行的唯一情况是第一个事件(也称为执行的创建) . - 有一个
WorkerApp
(外部应用程序,简单的消费者/生产者)从WorkerTask
(待完成的工作)消费并在WorkerTaskResult
上发布结果(主要是当前状态应用程序)。
这是实际流的简化版本:
Builder
-> Stream 1
- from KStream<WorkerTaskResult>
- join KTable<Execution>
- to Execution topic
-> Stream 2
- from KTable<Execution> (same than previous)
- multiple output
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
KStream 主要是一个 Executor State 应用程序,它可以找到下一步 WorkerTask
要做的事情并评估流程是否结束,因此该应用程序可以:
- 新建
TaskRun
- 改变当前状态
TaskRun
- 加入
WorkerTaskResult
或 - 评估整个执行并发现任务失败(基于依赖关系)
- 加入
- 更改执行状态并发布最终状态 SUCCEED 或 FAILED,这将打破 "infinite loop"
在这个实际版本中,我真正不清楚的是 Detected out-of-order KTable update
在现实世界中的含义是什么?
这是否意味着 KTable 每个分区和每个键必须有一个生产者才能保持主题的顺序?
编辑 2:
与此同时,我发现了一种新的方式来思考似乎正在运行的流应用程序。单元测试正在通过,不再 Detected out-of-order
。
这是简化的新流程:
Builder
- from KTable<Execution>
- leftJoin KTable<WorkerTaskResult>
- Branch
- If Join > to Execution topic
- If not joint > continue the flow
- Multiple output (same than previous)
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
我认为有道理的是:
WorkerTaskResult
现在是一个KTable,所以我只保留最后一版的结果- 我有一个输出到
Execution
的单一路径流(不再有 2 个路径)(我认为这是 最重要的 部分解决了 -顺序) - 整体似乎每个输入只有一个输出(
Execution
上的 1 个新值将在Execution
主题上产生 1 个新值)
这是新拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
--> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
<-- KSTREAM-SOURCE-0000000000
Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
--> KTABLE-SOURCE-0000000005
Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
--> KTABLE-JOINOTHER-0000000008
<-- KSTREAM-SOURCE-0000000004
Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000005
Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000001
Processor: KTABLE-MERGE-0000000006 (stores: [])
--> KTABLE-TOSTREAM-0000000009
<-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
--> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
<-- KTABLE-MERGE-0000000006
Processor: KSTREAM-FILTER-0000000015 (stores: [])
--> KSTREAM-MAPVALUES-0000000016
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
--> KSTREAM-MAPVALUES-0000000017
<-- KSTREAM-FILTER-0000000015
Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
--> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
<-- KSTREAM-MAPVALUES-0000000016
Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000042 (stores: [])
--> KSTREAM-MAPVALUES-0000000043
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000030 (stores: [])
--> KSTREAM-MAPVALUES-0000000031
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000055 (stores: [])
--> KSTREAM-MAPVALUES-0000000056
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
--> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
<-- KSTREAM-FILTER-0000000042
Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
--> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
<-- KSTREAM-FILTER-0000000030
Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
--> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
<-- KSTREAM-FILTER-0000000055
Processor: KSTREAM-FILTER-0000000024 (stores: [])
--> KSTREAM-MAPVALUES-0000000025
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000032 (stores: [])
--> KSTREAM-MAPVALUES-0000000033
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000044 (stores: [])
--> KSTREAM-MAPVALUES-0000000045
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-FILTER-0000000057 (stores: [])
--> KSTREAM-MAPVALUES-0000000058
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-FILTER-0000000010 (stores: [])
--> KSTREAM-MAPVALUES-0000000011
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-FILTER-0000000019 (stores: [])
--> KSTREAM-MAPVALUES-0000000020
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000050 (stores: [])
--> KSTREAM-MAPVALUES-0000000051
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
--> KSTREAM-FILTER-0000000026
<-- KSTREAM-FILTER-0000000024
Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
--> KSTREAM-MAPVALUES-0000000034
<-- KSTREAM-FILTER-0000000032
Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
--> KSTREAM-MAPVALUES-0000000046
<-- KSTREAM-FILTER-0000000044
Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
--> KSTREAM-MAPVALUES-0000000059
<-- KSTREAM-FILTER-0000000057
Processor: KSTREAM-FILTER-0000000026 (stores: [])
--> KSTREAM-FILTER-0000000027
<-- KSTREAM-MAPVALUES-0000000025
Processor: KSTREAM-FILTER-0000000038 (stores: [])
--> KSTREAM-MAPVALUES-0000000039
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000063 (stores: [])
--> KSTREAM-MAPVALUES-0000000064
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
--> KSTREAM-FILTER-0000000012
<-- KSTREAM-FILTER-0000000010
Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-FILTER-0000000019
Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
--> KSTREAM-FILTER-0000000035
<-- KSTREAM-MAPVALUES-0000000033
Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
--> KSTREAM-FILTER-0000000047
<-- KSTREAM-MAPVALUES-0000000045
Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
--> KSTREAM-FILTER-0000000052
<-- KSTREAM-FILTER-0000000050
Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
--> KSTREAM-FILTER-0000000060
<-- KSTREAM-MAPVALUES-0000000058
Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
--> KSTREAM-FILTER-0000000068
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000012 (stores: [])
--> KSTREAM-PEEK-0000000013
<-- KSTREAM-MAPVALUES-0000000011
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-PEEK-0000000022
<-- KSTREAM-MAPVALUES-0000000020
Processor: KSTREAM-FILTER-0000000027 (stores: [])
--> KSTREAM-PEEK-0000000028
<-- KSTREAM-FILTER-0000000026
Processor: KSTREAM-FILTER-0000000035 (stores: [])
--> KSTREAM-PEEK-0000000036
<-- KSTREAM-MAPVALUES-0000000034
Processor: KSTREAM-FILTER-0000000047 (stores: [])
--> KSTREAM-PEEK-0000000048
<-- KSTREAM-MAPVALUES-0000000046
Processor: KSTREAM-FILTER-0000000052 (stores: [])
--> KSTREAM-PEEK-0000000053
<-- KSTREAM-MAPVALUES-0000000051
Processor: KSTREAM-FILTER-0000000060 (stores: [])
--> KSTREAM-PEEK-0000000061
<-- KSTREAM-MAPVALUES-0000000059
Processor: KSTREAM-FILTER-0000000068 (stores: [])
--> KSTREAM-PEEK-0000000069
<-- KSTREAM-MAPVALUES-0000000067
Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
--> KSTREAM-FILTER-0000000040
<-- KSTREAM-FILTER-0000000038
Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
--> KSTREAM-TRANSFORM-0000000065
<-- KSTREAM-FILTER-0000000063
Processor: KSTREAM-FILTER-0000000040 (stores: [])
--> KSTREAM-SINK-0000000041
<-- KSTREAM-MAPVALUES-0000000039
Processor: KSTREAM-PEEK-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
<-- KSTREAM-FILTER-0000000012
Processor: KSTREAM-PEEK-0000000022 (stores: [])
--> KSTREAM-SINK-0000000023
<-- KSTREAM-FILTER-0000000021
Processor: KSTREAM-PEEK-0000000028 (stores: [])
--> KSTREAM-SINK-0000000029
<-- KSTREAM-FILTER-0000000027
Processor: KSTREAM-PEEK-0000000036 (stores: [])
--> KSTREAM-SINK-0000000037
<-- KSTREAM-FILTER-0000000035
Processor: KSTREAM-PEEK-0000000048 (stores: [])
--> KSTREAM-SINK-0000000049
<-- KSTREAM-FILTER-0000000047
Processor: KSTREAM-PEEK-0000000053 (stores: [])
--> KSTREAM-SINK-0000000054
<-- KSTREAM-FILTER-0000000052
Processor: KSTREAM-PEEK-0000000061 (stores: [])
--> KSTREAM-SINK-0000000062
<-- KSTREAM-FILTER-0000000060
Processor: KSTREAM-PEEK-0000000069 (stores: [])
--> KSTREAM-SINK-0000000070
<-- KSTREAM-FILTER-0000000068
Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
--> KSTREAM-SINK-0000000066
<-- KSTREAM-MAPVALUES-0000000064
Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
--> log-executionStream
<-- KTABLE-SOURCE-0000000001
Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000013
Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000022
Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000028
Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000036
Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
<-- KSTREAM-FILTER-0000000040
Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000048
Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000053
Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000061
Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
<-- KSTREAM-TRANSFORM-0000000065
Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000069
Processor: log-executionStream (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000002
目前,我不清楚该解决方案是否对任何并发具有弹性,以及我是否可以再次遇到乱序(这意味着执行在之前的时间回滚并且导致多次执行同一任务)。
is this pattern (that is not a dag flow as we sink on the same topic) are supported by KafkaStreams ?
总的来说是的。您只需要确保您不会以 "infinite loop" 结束,即在某个时候输入记录应该 "terminate" 并且不再对输出主题产生任何内容。对于您的情况,Execution
最终不应再创建新的 Tasks
(通过反馈循环)。
what is the good way to design this stream to be concurrency safe
它总是取决于具体的应用......对于你的情况,如果我正确理解你的应用程序的设计,你基本上有两个输入主题(Execution
和WorkerTaskResult
)和两个输出主题(Execution
和 WorkerTask
)。在处理输入主题时,来自每个输入的消息可能会修改共享状态(即任务的状态)。
此外,还有一个"outside application"从WorkerTask
主题读取并写入WorkerTaskResult
主题?因此,您的整体数据流中实际上存在第二个循环?我假设还有其他上游应用程序实际上也会将新数据推送到 Execution
主题?
+-----------------+
| |
v |
upstream producers ---> "Execution" --+ |
| |
v |
KS-App --+
^ |
| |
+--> "WorkerTaskResult" --+ +--> "WorkerTask" --+
| |
+------------------------ outside app <----------------+
我在 atm 上不清楚的地方:
- 哪些状态更改从 KS-App 直接传播回
Execution
? - 哪些状态变化是从 "outside app" 通过
WorkerTaskResult
传播的?
也许你可以更新你的问题,我可以尝试相应地更新我的答案。
更新(基于编辑 1 和 2)
to Execution & to WorkerTask topic (if found a next task)
这一步好像引入了竞争条件?当回写到 Execution
主题时,您会在回读时更新状态。并行地,任务的执行可能首先完成(即,在 Execution
更新被重新读取和处理之前),因此我们可以写入第二个 Execution
更新(当任务完成时)以更新国家第一?
On this actual version, what is really unclear for me is what is the meaning of Detected out-of-order KTable update in a real world? Does this mean that a KTable must have a single producer per partition and per key in order to keep order on the topic ?
可以这么说。对于每个输入记录,table()
运算符将输入的时间戳与 table 中当前条目的时间戳进行比较。如果输入记录的时间戳更小,则会记录 WARN(仍会应用更新):WARN 的原因是 table 每个键仅存储一个条目,而 table 期望只能及时前进。如果存在乱序更新,这可能会导致意外结果,从而导致 WARN 日志。每个分区使用单个生产者或每个密钥使用单个生产者将避免每个密钥的数据乱序(假设生产者只发送有序数据)。
如果我完全理解您的应用程序的新版本,我不是 100% 确定 atm,但总的来说,您希望确保避免数据竞争,并将更新线性化到 Execution
。