KStream to KTable Inner Join 每次处理相同数据时产生不同数量的记录
KStream to KTable Inner Join producing different number of records every time processed with same data
我想做一个 KStream 到 KTable Join。仅将 KTable 用作查找 table。
以下步骤显示了代码执行的顺序
构造KTable
ReKey KTable
构造KStream
ReKey KStream
加入KStream-KTable
假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,并假设对于 KStreams 中的每个键,KTable 中都有一条记录。因此预期输出将是 8000 条记录。
每次我第一次加入或启动应用程序时。
预期输出是 8000 条记录,但我有时只看到 6200 条记录,有时看到 8000 条完整的记录集(两次),有时没有记录,等等
问题一:为什么每次运行申请时记录都不一致?
在构造 KTable 之前(construct + Rekey),KStreams 被构造并且数据可用于从 KStream 端连接然后连接从 KTable 开始,因此在构造 KTable 之前不会在最终连接中看到数据。构建 KTable 后,我们可以看到剩余记录正在发生连接。
问题二:如何解决join记录不一致的问题?
我尝试使用嵌入式 Kafka 进行 KStream 和 Ktable 连接的测试用例。 KStreams 中有 10 条记录,KTable 中有 3 条记录被使用。当我 运行 第一次测试用例时 没有加入,加入后也没有看到任何数据。当运行 相同的第二次它运行 完美。如果我清除状态存储然后回到零。
问题 3:为什么会出现这种情况?
我尝试使用 KSQL 并且连接工作正常,我得到了 8000 条记录,然后我进入了 KSQL 源代码,我注意到 KSQL 也在执行相同的连接功能。
问题四:KSQL是如何解决的?
我看到的示例建议答案很少
- 使用 GlobalKTable 无效。我得到了同样的不一致连接。
- 使用自定义连接器https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java
没用
我正在使用 spring 云流作为依赖项。
此外,我在 JIRA 的某处看到了关于此的未解决问题。
below steps shows the sequence in which code is executed
注意,构建拓扑只是提供数据流程序的逻辑描述,并没有"order of execution"不同的算子。程序将被翻译,所有操作符将同时执行。因此,所有主题的数据将被并行读取。
这种并行处理是您观察到的根本原因,即 table 在处理开始之前 不是 首先加载(至少默认情况下不能保证) 因此,即使 table 未完全加载,也可以处理流端数据。
不同主题之间的处理顺序取决于记录时间戳:首先处理时间戳较小的记录。因此,如果要保证先处理KTable数据,就必须保证记录时间戳小于流端记录时间戳。这可以在您将输入数据生成到输入主题时或通过使用自定义时间戳提取器来确保。
其次,从主题中获取数据是不确定的,因此,如果仅返回流端的数据(而不是 table 端数据),则无法进行时间戳比较,因此流端数据将在 table 侧数据之前处理。要解决此问题,您可以增加配置参数 max.task.idle.ms
(默认值为 0ms
)。如果你增加这个配置(我相信这也是 KSQL 默认情况下所做的)如果一个输入没有数据,任务将阻塞并尝试为空输入获取数据(只有在空闲时间过去后,处理才会继续即使一侧是空的)。
对于 GlobalKTable
行为是不同的。此 table 将在任何处理开始之前在启动时加载。因此,我不确定为什么这对您不起作用。
我想做一个 KStream 到 KTable Join。仅将 KTable 用作查找 table。 以下步骤显示了代码执行的顺序
构造KTable
ReKey KTable
构造KStream
ReKey KStream
加入KStream-KTable
假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,并假设对于 KStreams 中的每个键,KTable 中都有一条记录。因此预期输出将是 8000 条记录。
每次我第一次加入或启动应用程序时。 预期输出是 8000 条记录,但我有时只看到 6200 条记录,有时看到 8000 条完整的记录集(两次),有时没有记录,等等
问题一:为什么每次运行申请时记录都不一致?
在构造 KTable 之前(construct + Rekey),KStreams 被构造并且数据可用于从 KStream 端连接然后连接从 KTable 开始,因此在构造 KTable 之前不会在最终连接中看到数据。构建 KTable 后,我们可以看到剩余记录正在发生连接。
问题二:如何解决join记录不一致的问题?
我尝试使用嵌入式 Kafka 进行 KStream 和 Ktable 连接的测试用例。 KStreams 中有 10 条记录,KTable 中有 3 条记录被使用。当我 运行 第一次测试用例时 没有加入,加入后也没有看到任何数据。当运行 相同的第二次它运行 完美。如果我清除状态存储然后回到零。
问题 3:为什么会出现这种情况?
我尝试使用 KSQL 并且连接工作正常,我得到了 8000 条记录,然后我进入了 KSQL 源代码,我注意到 KSQL 也在执行相同的连接功能。
问题四:KSQL是如何解决的?
我看到的示例建议答案很少
- 使用 GlobalKTable 无效。我得到了同样的不一致连接。
- 使用自定义连接器https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java 没用
我正在使用 spring 云流作为依赖项。
此外,我在 JIRA 的某处看到了关于此的未解决问题。
below steps shows the sequence in which code is executed
注意,构建拓扑只是提供数据流程序的逻辑描述,并没有"order of execution"不同的算子。程序将被翻译,所有操作符将同时执行。因此,所有主题的数据将被并行读取。
这种并行处理是您观察到的根本原因,即 table 在处理开始之前 不是 首先加载(至少默认情况下不能保证) 因此,即使 table 未完全加载,也可以处理流端数据。
不同主题之间的处理顺序取决于记录时间戳:首先处理时间戳较小的记录。因此,如果要保证先处理KTable数据,就必须保证记录时间戳小于流端记录时间戳。这可以在您将输入数据生成到输入主题时或通过使用自定义时间戳提取器来确保。
其次,从主题中获取数据是不确定的,因此,如果仅返回流端的数据(而不是 table 端数据),则无法进行时间戳比较,因此流端数据将在 table 侧数据之前处理。要解决此问题,您可以增加配置参数 max.task.idle.ms
(默认值为 0ms
)。如果你增加这个配置(我相信这也是 KSQL 默认情况下所做的)如果一个输入没有数据,任务将阻塞并尝试为空输入获取数据(只有在空闲时间过去后,处理才会继续即使一侧是空的)。
对于 GlobalKTable
行为是不同的。此 table 将在任何处理开始之前在启动时加载。因此,我不确定为什么这对您不起作用。