对GlobalKTable的质疑
Doubts on GlobalKTable
我是 Kafka Streams 的新手,我尝试创建一个 poc 看看它是否适合我的用例。
我有一个主题,我正在制作一些参考数据。然后将此数据流式传输并转换为 GlobalKTable CPK(我使用了 GlobalKTable,因为我需要加入非键)。一旦这个处理完成。然后我开始填充另一个主题,然后流式传输 (SPT) 数据并对 CPK 进行内部连接以生成另一个 GlobalKTable (JTK)。
CPK 和 SPT 都是来自外部系统的提要。
现在我有实时数据进来,我需要查找我刚刚填充的参考数据。假设此流称为 "Real"。然后 Real 使用 JTK 进行内部连接,我们实际上得到了很好的结果。
问题是我需要从 CPK 中删除一行。我传递了一个具有空值的键,我希望它从 CPK 中删除这个值,并且他更改为也传播到 JTK。因此,应删除具有该密钥的任何 JTK 记录。但这并没有发生。
这可行吗?我的思考方式正确吗?我应该使用 KSQL 吗?
提前致谢。
感谢@cricket_007的建议。我用的是 KSQL,一切都是。现在工作。所以基本上我首先通过加入 JTK 创建了一个流,它现在是一个带有 SPT 的流(而不是之前的 GKTable)。让我们将其命名为 Joined Stream。之后,我又创建了一个名为 result by 的流。 joining 加入了 table 的 CPK。 (我在KSQL中没有找到GKTable的概念)
我是 Kafka Streams 的新手,我尝试创建一个 poc 看看它是否适合我的用例。
我有一个主题,我正在制作一些参考数据。然后将此数据流式传输并转换为 GlobalKTable CPK(我使用了 GlobalKTable,因为我需要加入非键)。一旦这个处理完成。然后我开始填充另一个主题,然后流式传输 (SPT) 数据并对 CPK 进行内部连接以生成另一个 GlobalKTable (JTK)。
CPK 和 SPT 都是来自外部系统的提要。
现在我有实时数据进来,我需要查找我刚刚填充的参考数据。假设此流称为 "Real"。然后 Real 使用 JTK 进行内部连接,我们实际上得到了很好的结果。
问题是我需要从 CPK 中删除一行。我传递了一个具有空值的键,我希望它从 CPK 中删除这个值,并且他更改为也传播到 JTK。因此,应删除具有该密钥的任何 JTK 记录。但这并没有发生。
这可行吗?我的思考方式正确吗?我应该使用 KSQL 吗?
提前致谢。
感谢@cricket_007的建议。我用的是 KSQL,一切都是。现在工作。所以基本上我首先通过加入 JTK 创建了一个流,它现在是一个带有 SPT 的流(而不是之前的 GKTable)。让我们将其命名为 Joined Stream。之后,我又创建了一个名为 result by 的流。 joining 加入了 table 的 CPK。 (我在KSQL中没有找到GKTable的概念)