samza 任务如何消耗多个 kafka 分区流
How can a samza task consume more than one kafka partitioned streams
我有一个典型的 samza 任务,它使用 2 个主题:data
和 config
,并将来自 config
的消息作为本地状态存储在 rocksdb 中以检查来自 [=10] 的消息是否=] 都可以。
如果这两个主题中的每一个都只有一个分区,则此任务工作正常。一旦我将 data
分成十个分区并且 config
仍然是一个分区,事情就变了。默认情况下,samza 创建十个任务来消耗 data
主题的分区 0 ~ 9,只有任务 0 消耗 config
主题:
task[0] -> config, data[0]
task[1] -> data[1]
...
task[9] -> data[9]
似乎每个任务都使用自己的rocksdb实例初始化,所以只有task[0]将所有配置数据存储在其rocksdb实例中,task[1~9]没有配置数据因此无法找到配置信息传入的数据。
我期望的是每个任务都使用来自其数据分区和配置流的消息,如下所示:
task[0] -> config, data[0]
task[1] -> config, data[1]
...
task[9] -> config, data[9]
有什么办法可以实现吗?
输入流分区的分布由使用 "job.systemstreampartition.grouper.factor" 配置的可插入分组管理。默认情况下,This class 将跨任务实例的传入流分区分组。默认情况下,我相信它执行 GroupByPartitionId。这就是为什么您在 task[0].
中看到 data[0] 和 config[0]
您可以实施自定义 SSPGrouper。但是,您正在寻找的是将 "data" 流视为常规输入流,将 "config" 流视为“广播”输入流。广播意味着 Samza 作业中的每个任务都从该流的分区中读取。这样,每个任务实例都可以使用配置流的数据填充其本地 rocksdb。您可以将广播流配置为:
task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]
对于您的情况,您可以配置:
task.inputs = <systemName>.data
task.broadcast.inputs = <systemName>.config#0
我有一个典型的 samza 任务,它使用 2 个主题:data
和 config
,并将来自 config
的消息作为本地状态存储在 rocksdb 中以检查来自 [=10] 的消息是否=] 都可以。
如果这两个主题中的每一个都只有一个分区,则此任务工作正常。一旦我将 data
分成十个分区并且 config
仍然是一个分区,事情就变了。默认情况下,samza 创建十个任务来消耗 data
主题的分区 0 ~ 9,只有任务 0 消耗 config
主题:
task[0] -> config, data[0]
task[1] -> data[1]
...
task[9] -> data[9]
似乎每个任务都使用自己的rocksdb实例初始化,所以只有task[0]将所有配置数据存储在其rocksdb实例中,task[1~9]没有配置数据因此无法找到配置信息传入的数据。
我期望的是每个任务都使用来自其数据分区和配置流的消息,如下所示:
task[0] -> config, data[0]
task[1] -> config, data[1]
...
task[9] -> config, data[9]
有什么办法可以实现吗?
输入流分区的分布由使用 "job.systemstreampartition.grouper.factor" 配置的可插入分组管理。默认情况下,This class 将跨任务实例的传入流分区分组。默认情况下,我相信它执行 GroupByPartitionId。这就是为什么您在 task[0].
中看到 data[0] 和 config[0]您可以实施自定义 SSPGrouper。但是,您正在寻找的是将 "data" 流视为常规输入流,将 "config" 流视为“广播”输入流。广播意味着 Samza 作业中的每个任务都从该流的分区中读取。这样,每个任务实例都可以使用配置流的数据填充其本地 rocksdb。您可以将广播流配置为:
task.broadcast.inputs=<systemName>.<streamName>#[<partition-range>], <systemName>.<streamName>#[<partition-range>]
对于您的情况,您可以配置:
task.inputs = <systemName>.data
task.broadcast.inputs = <systemName>.config#0