具有依赖对象的 Kafka Streams 等待函数
Kafka Streams wait function with depending objects
我创建了一个 Kafka Streams 应用程序,它接收来自不同主题的不同 JSON 对象,我想实现某种等待功能,但我不确定如何最好地实现它。
为了简化问题,我将在下一节中使用简化的实体,希望可以很好地描述问题。
因此,在我的一个流中,我收到了汽车对象,每辆汽车都有一个 ID。在第二个流中,我收到人对象,每个人都有一个汽车 ID,并被分配给具有此 ID 的汽车。
我想使用我的 Kafka Streams 应用程序从两个输入流(主题)读取并用具有相同汽车 ID 的四个人丰富汽车对象。只有当所有四个人都包含在汽车对象中时,汽车对象才应转发给下一个下游处理器。
我计划为汽车创建一个输入流,为人对象创建一个输入流,将 JSON 数据解析为内部对象表示,将两个流合并在一起并应用 "selectKey" 函数在合并流上从实体中提取键。
之后,我会将数据推送到一个自定义转换函数中,该函数包含一个状态存储。在这个转换函数中,我会将每个到达的汽车对象及其 ID 存储在状态存储中。一旦新的人对象到达,我会将它们添加到状态存储中的相应汽车对象(请忽略此处迟到汽车的情况)。一旦有四个人在汽车对象中,我就会将该对象转发到下一个流函数并将汽车对象从状态存储中移除。
这是一个合适的方法吗?我不确定可伸缩性,因为我必须确保当 运行 多个实例时,具有相同 id 的 car 和 person 对象将由同一个应用程序实例处理。我会为此使用 selectKey 函数,这行得通吗?
谢谢!
我觉得基本设计不错。
但是,selectKey()
本身是不够的,因为 transform()
(与 DSL 运营商相反)不会触发自动重新平衡。因此,您需要通过 through()
.
手动重新平衡
stream.selectKey(...)
.through("user-created-topic")
.transform(...);
https://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning
我创建了一个 Kafka Streams 应用程序,它接收来自不同主题的不同 JSON 对象,我想实现某种等待功能,但我不确定如何最好地实现它。
为了简化问题,我将在下一节中使用简化的实体,希望可以很好地描述问题。 因此,在我的一个流中,我收到了汽车对象,每辆汽车都有一个 ID。在第二个流中,我收到人对象,每个人都有一个汽车 ID,并被分配给具有此 ID 的汽车。
我想使用我的 Kafka Streams 应用程序从两个输入流(主题)读取并用具有相同汽车 ID 的四个人丰富汽车对象。只有当所有四个人都包含在汽车对象中时,汽车对象才应转发给下一个下游处理器。
我计划为汽车创建一个输入流,为人对象创建一个输入流,将 JSON 数据解析为内部对象表示,将两个流合并在一起并应用 "selectKey" 函数在合并流上从实体中提取键。 之后,我会将数据推送到一个自定义转换函数中,该函数包含一个状态存储。在这个转换函数中,我会将每个到达的汽车对象及其 ID 存储在状态存储中。一旦新的人对象到达,我会将它们添加到状态存储中的相应汽车对象(请忽略此处迟到汽车的情况)。一旦有四个人在汽车对象中,我就会将该对象转发到下一个流函数并将汽车对象从状态存储中移除。
这是一个合适的方法吗?我不确定可伸缩性,因为我必须确保当 运行 多个实例时,具有相同 id 的 car 和 person 对象将由同一个应用程序实例处理。我会为此使用 selectKey 函数,这行得通吗?
谢谢!
我觉得基本设计不错。
但是,selectKey()
本身是不够的,因为 transform()
(与 DSL 运营商相反)不会触发自动重新平衡。因此,您需要通过 through()
.
stream.selectKey(...)
.through("user-created-topic")
.transform(...);
https://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning