如何使用应用实例 B 删除存储在应用实例 A 的状态存储中的 key/value
How can a key/value stored in app instance A's state store be deleted using app instance B
据我了解,状态存储是每个应用程序实例的本地存储(重新:分区)....来自文档。
Because Kafka Streams partitions the data for processing it, an application’s entire state is spread across the local state stores of the application’s running instances.
我有一个用例,我只需要有包含特定值的任意键(我们称之为 value123)。如果收到另一个 keyB/value123 消息并且 value123 相同但之前有不同的密钥 (keyD),我需要删除旧的 keyD/value123.
问题出在这里 - 我只收到新的 key/value 关联。我没有收到旧密钥的“墓碑”消息 - 因此我必须暗示墓碑,因为新密钥刚刚到达具有相同值的主题。如果 key/value 在另一个应用程序实例的状态存储中,则无法访问(删除)它,因为每个实例的状态都是本地的。我需要驱逐旧数据。我怎样才能做到这一点?
换个角度看:
如果带有键 A 的消息进入转换器并且该转换器的工作是清理状态以确保没有其他键具有该值...假设键 A 的值当前为 'associated'键 B。我需要从 KTable/state 存储中删除键 B,这样键 A 就可以成为唯一与该值关联的东西。我不能保证密钥 B 分配给与密钥 A 相同的分区。如何从密钥 A 所在的分区中删除密钥 B。
Kafka Streams 应用程序的实例可以使用 RPC 进行通信 - https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app。
您可以通过创建自定义 RPC 端点和构建逻辑来查询其他实例以从远程状态存储中删除值(如果找到)。
我可以通过将我的密钥切换到另一个数据点并使用新的 2.5.0 功能通过外键连接 2 个 ktables 来解决我的问题。这将控制输出,因为一旦新记录使用相同的键(但不同的外键)进来,我的另一个 ktable 将不会加入,因为外键已更改。
我用这两个作为资源:
https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html
据我了解,状态存储是每个应用程序实例的本地存储(重新:分区)....来自文档。
Because Kafka Streams partitions the data for processing it, an application’s entire state is spread across the local state stores of the application’s running instances.
我有一个用例,我只需要有包含特定值的任意键(我们称之为 value123)。如果收到另一个 keyB/value123 消息并且 value123 相同但之前有不同的密钥 (keyD),我需要删除旧的 keyD/value123.
问题出在这里 - 我只收到新的 key/value 关联。我没有收到旧密钥的“墓碑”消息 - 因此我必须暗示墓碑,因为新密钥刚刚到达具有相同值的主题。如果 key/value 在另一个应用程序实例的状态存储中,则无法访问(删除)它,因为每个实例的状态都是本地的。我需要驱逐旧数据。我怎样才能做到这一点?
换个角度看:
如果带有键 A 的消息进入转换器并且该转换器的工作是清理状态以确保没有其他键具有该值...假设键 A 的值当前为 'associated'键 B。我需要从 KTable/state 存储中删除键 B,这样键 A 就可以成为唯一与该值关联的东西。我不能保证密钥 B 分配给与密钥 A 相同的分区。如何从密钥 A 所在的分区中删除密钥 B。
Kafka Streams 应用程序的实例可以使用 RPC 进行通信 - https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app。
您可以通过创建自定义 RPC 端点和构建逻辑来查询其他实例以从远程状态存储中删除值(如果找到)。
我可以通过将我的密钥切换到另一个数据点并使用新的 2.5.0 功能通过外键连接 2 个 ktables 来解决我的问题。这将控制输出,因为一旦新记录使用相同的键(但不同的外键)进来,我的另一个 ktable 将不会加入,因为外键已更改。
我用这两个作为资源:
https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html