Spring 为 Kafka 创建云数据流主题
Spring Cloud Data Flow topics creation for Kafka
我有自己的 Spring 云数据流处理器,里面有 Python,我使用这个示例作为指导:https://dataflow.spring.io/docs/recipes/polyglot/processor/。
然后我想扩展并创建三个这样的处理器,所以使用 spring.cloud.deployer.myApp.count=3
我创建了 3 个 pods,里面有 Python。
我稍微修改了示例中的代码:当我创建一个Kafka消费者时,我还传递了一个group id,所以消息应该是负载均衡的。
consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])
问题是 SCDF 创建了一个只有 1 个分区的 Kafka 主题,所以消息只到达一个 pod。
所以我想知道:
- 我是否应该以某种方式配置 SCDF 以创建具有 3 个分区的 Kafka 主题?
- 或者我不应该依赖 SCDF 而在 Python 中自己创建主题吗?我想这将是多余的,因为 SCDF 也创建了这个主题。
- SCDF 中的哪个组件实际负责 Kafka 主题创建?我如何影响分区数量?
- 如果我停止此流并以 4 个处理器步骤再次启动,主题是否应该扩展到第 4 个分区?因为目前没有创建新的分区。
请花点时间查看 Spring Cloud Data Flow 的 responsibilities。如果不清楚,SCDF 既不会与 Kafka 等支持消息中间件交互,也不会在运行时使用它。换句话说,SCDF 不会创建与之关联的主题或分区——它只是自动配置 Spring Cloud Stream (SCSt) 属性。
但是,如果您在自定义处理器中使用 SCSt,该框架会自动将所需通道绑定到中间件中的基础主题。该框架还具有更改分区行为的功能。您也可以部署具有过度分区主题的处理器。有 several other configuration options 构建所需的流数据处理行为。
您正在查看的 Python 示例不具备 SCSt 提供的所有功能。该配方是一个示例演练,说明某人如何在 Python 中构建原生处理器样式的应用程序,其中生产者和消费者配置是在 Python 代码本身中手动创建的。 SCDF 和 SCSt 都不会影响此配方中的应用程序行为。
Should I somehow configure SCDF to create a Kafka topic with 3 partitions?
如前所述,SCDF 不与 Kafka 交互。
Or should I not rely on SCDF and create topics on my own in Python? I suppose this will be redundant, since SCDF also creates this topic.
如果您的自定义处理器不是 Spring Cloud Stream 应用程序,是的,您有责任在代码中明确定义主题 + 分区。
What component in SCDF is actually responsible for Kafka topics creation? And how can I influence it regarding number of partitions?
Spring 云流。见上面的解释。
If I stop this stream and launch again with 4 processor steps, should the topic be extended with the 4th partition? Because currently no new partitions get created.
您不一定需要重新启动流数据管道。如果您的主题预先过度分区,是的,运行时的任何其他消费者都应该能够自动参与竞争消费者关系。密切关注 spring-io/dataflow.spring.io#156 — 我们正在添加一个方法来演示使用 SCSt + SCDF + Kafka 进行手动和自动缩放的可能性。
能够通过将以下代码引入 Python 容器启动脚本来解决这个问题,改进了 https://dataflow.spring.io/docs/recipes/polyglot/processor/ 中提供的代码。使用 SCDF 服务器传递的参数获取代理 URL、主题名称、实例数:
admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])
partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")
# create Kafka topic if does not exist
new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
try:
admin_client.create_topics(new_topics=[new_topic])
except TopicAlreadyExistsError:
logging.info(f"Topic {get_input_channel()} was already created")
# add Kafka partitions to existing topic
new_partitions = NewPartitions(total_count=partition_count)
try:
admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
except InvalidPartitionsError as exp:
logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")
我有自己的 Spring 云数据流处理器,里面有 Python,我使用这个示例作为指导:https://dataflow.spring.io/docs/recipes/polyglot/processor/。
然后我想扩展并创建三个这样的处理器,所以使用 spring.cloud.deployer.myApp.count=3
我创建了 3 个 pods,里面有 Python。
我稍微修改了示例中的代码:当我创建一个Kafka消费者时,我还传递了一个group id,所以消息应该是负载均衡的。
consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])
问题是 SCDF 创建了一个只有 1 个分区的 Kafka 主题,所以消息只到达一个 pod。 所以我想知道:
- 我是否应该以某种方式配置 SCDF 以创建具有 3 个分区的 Kafka 主题?
- 或者我不应该依赖 SCDF 而在 Python 中自己创建主题吗?我想这将是多余的,因为 SCDF 也创建了这个主题。
- SCDF 中的哪个组件实际负责 Kafka 主题创建?我如何影响分区数量?
- 如果我停止此流并以 4 个处理器步骤再次启动,主题是否应该扩展到第 4 个分区?因为目前没有创建新的分区。
请花点时间查看 Spring Cloud Data Flow 的 responsibilities。如果不清楚,SCDF 既不会与 Kafka 等支持消息中间件交互,也不会在运行时使用它。换句话说,SCDF 不会创建与之关联的主题或分区——它只是自动配置 Spring Cloud Stream (SCSt) 属性。
但是,如果您在自定义处理器中使用 SCSt,该框架会自动将所需通道绑定到中间件中的基础主题。该框架还具有更改分区行为的功能。您也可以部署具有过度分区主题的处理器。有 several other configuration options 构建所需的流数据处理行为。
您正在查看的 Python 示例不具备 SCSt 提供的所有功能。该配方是一个示例演练,说明某人如何在 Python 中构建原生处理器样式的应用程序,其中生产者和消费者配置是在 Python 代码本身中手动创建的。 SCDF 和 SCSt 都不会影响此配方中的应用程序行为。
Should I somehow configure SCDF to create a Kafka topic with 3 partitions?
如前所述,SCDF 不与 Kafka 交互。
Or should I not rely on SCDF and create topics on my own in Python? I suppose this will be redundant, since SCDF also creates this topic.
如果您的自定义处理器不是 Spring Cloud Stream 应用程序,是的,您有责任在代码中明确定义主题 + 分区。
What component in SCDF is actually responsible for Kafka topics creation? And how can I influence it regarding number of partitions?
Spring 云流。见上面的解释。
If I stop this stream and launch again with 4 processor steps, should the topic be extended with the 4th partition? Because currently no new partitions get created.
您不一定需要重新启动流数据管道。如果您的主题预先过度分区,是的,运行时的任何其他消费者都应该能够自动参与竞争消费者关系。密切关注 spring-io/dataflow.spring.io#156 — 我们正在添加一个方法来演示使用 SCSt + SCDF + Kafka 进行手动和自动缩放的可能性。
能够通过将以下代码引入 Python 容器启动脚本来解决这个问题,改进了 https://dataflow.spring.io/docs/recipes/polyglot/processor/ 中提供的代码。使用 SCDF 服务器传递的参数获取代理 URL、主题名称、实例数:
admin_client = KafkaAdminClient(bootstrap_servers=[get_kafka_binder_brokers()], client_id=sys.argv[0])
partition_count = get_cmd_arg("spring.cloud.stream.instanceCount")
# create Kafka topic if does not exist
new_topic = NewTopic(name=get_input_channel(), num_partitions=partition_count, replication_factor=1)
try:
admin_client.create_topics(new_topics=[new_topic])
except TopicAlreadyExistsError:
logging.info(f"Topic {get_input_channel()} was already created")
# add Kafka partitions to existing topic
new_partitions = NewPartitions(total_count=partition_count)
try:
admin_client.create_partitions(topic_partitions={get_input_channel(): new_partitions})
except InvalidPartitionsError as exp:
logging.info(f"No need to increase Kafka partitions for topic {get_input_channel()}")