如何合并 Kafka 主题的输出
How to combine output from Kafka Topics
以下是我的用例场景,其中一个应用程序将数据推送到三个不同的kafka主题(有唯一的应用程序ID),输出将转到后续的队列4和队列5。我已经实现了所示的管道下面。
我面临的唯一问题是如何合并主题 5 中特定 app_id 的所有输出。应用程序推送多个请求,每个请求在此管道中都有一个唯一的 ID。所以对特定 app_id 的所有请求可能没有顺序。队列 5 中可能还有其他 app_id 数据。
在为主题 5 创建消费者时,我是否应该为每个 app_id 使用不同的 group_id?
如果您有任何想法,请帮助我。我正在使用 kafka-python.
from kafka import KafkaConsumer, KafkaProducer
KAFKA = dict()
KAFKA['producer'] = KafkaProducer(bootstrap_servers=[server]))
for queue in ['queue 1', 'queue 2', 'queue 3', 'queue 4', 'queue 5']:
KAFKA['queue'] = KafkaConsumer(queue,
bootstrap_servers=[server],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id='group'+queue)
如果您只想一次阅读三个主题,那么您可以 KafkaConsumer('1,2,3')
如果目标是像这样拥有多个主题链,我也会推荐 faust
以下是我的用例场景,其中一个应用程序将数据推送到三个不同的kafka主题(有唯一的应用程序ID),输出将转到后续的队列4和队列5。我已经实现了所示的管道下面。
我面临的唯一问题是如何合并主题 5 中特定 app_id 的所有输出。应用程序推送多个请求,每个请求在此管道中都有一个唯一的 ID。所以对特定 app_id 的所有请求可能没有顺序。队列 5 中可能还有其他 app_id 数据。
在为主题 5 创建消费者时,我是否应该为每个 app_id 使用不同的 group_id?
如果您有任何想法,请帮助我。我正在使用 kafka-python.
from kafka import KafkaConsumer, KafkaProducer
KAFKA = dict()
KAFKA['producer'] = KafkaProducer(bootstrap_servers=[server]))
for queue in ['queue 1', 'queue 2', 'queue 3', 'queue 4', 'queue 5']:
KAFKA['queue'] = KafkaConsumer(queue,
bootstrap_servers=[server],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id='group'+queue)
如果您只想一次阅读三个主题,那么您可以 KafkaConsumer('1,2,3')
如果目标是像这样拥有多个主题链,我也会推荐 faust