KStream to KTable Inner Join 每次处理相同数据时产生不同数量的记录

KStream to KTable Inner Join producing different number of records every time processed with same data

我想做一个 KStream 到 KTable Join。仅将 KTable 用作查找 table。 以下步骤显示了代码执行的顺序

  1. 构造KTable

  2. ReKey KTable

  3. 构造KStream

  4. ReKey KStream

  5. 加入KStream-KTable

假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,并假设对于 KStreams 中的每个键,KTable 中都有一条记录。因此预期输出将是 8000 条记录。

每次我第一次加入或启动应用程序时。 预期输出是 8000 条记录,但我有时只看到 6200 条记录,有时看到 8000 条完整的记录集(两次),有时没有记录,等等

我看到的示例建议答案很少

我正在使用 spring 云流作为依赖项。

此外,我在 JIRA 的某处看到了关于此的未解决问题。

below steps shows the sequence in which code is executed

注意,构建拓扑只是提供数据流程序的逻辑描述,并没有"order of execution"不同的算子。程序将被翻译,所有操作符将同时执行。因此,所有主题的数据将被并行读取。

这种并行处理是您观察到的根本原因,即 table 在处理开始之前 不是 首先加载(至少默认情况下不能保证) 因此,即使 table 未完全加载,也可以处理流端数据。

不同主题之间的处理顺序取决于记录时间戳:首先处理时间戳较小的记录。因此,如果要保证先处理KTable数据,就必须保证记录时间戳小于流端记录时间戳。这可以在您将输入数据生成到输入主题时或通过使用自定义时间戳提取器来确保。

其次,从主题中获取数据是不确定的,因此,如果仅返回流端的数据(而不是 table 端数据),则无法进行时间戳比较,因此流端数据将在 table 侧数据之前处理。要解决此问题,您可以增加配置参数 max.task.idle.ms(默认值为 0ms)。如果你增加这个配置(我相信这也是 KSQL 默认情况下所做的)如果一个输入没有数据,任务将阻塞并尝试为空输入获取数据(只有在空闲时间过去后,处理才会继续即使一侧是空的)。

对于 GlobalKTable 行为是不同的。此 table 将在任何处理开始之前在启动时加载。因此,我不确定为什么这对您不起作用。