Spring 引导服务中具有长任务的 KafkaListener
KafkaListener with long tasks in Spring Boot Service
我有一个消费kafka消息并触发一个长进程的服务。处理一条消息可能需要 10 分钟以上。当前在“doSomething()”方法完成之前不会消耗新消息。如何使它成为并发处理并并行处理消息?
@Service
public class MyService {
@KafkaListener(topics = "request-topic", groupId = "group_id"),
containerFactory = "requirementsKafkaListenerFactory")
private void consumeKafkaRequirementsDataJson(KafkaRequirementsData kafkaRequirementsData) {
System.out.println("Consumed JSON Message from kafka Topic: " + kafkaRequirementsData);
doSomething(kafkaRequirementsData);
}
您可以为此设置并发性 属性,但每个消费者的线程数量受限于该主题的分区数量。如果您已经有相当数量的分区,这可能是可行的方法,但我不会仅仅为了长任务的并行性而增加分区的数量。
您可以尝试在您的应用程序上实现多线程,但管理资源将由您负责,此外还要考虑到一旦一条消息被使用并且线程被启动,您将负责其管理(包括出现问题时的重试)。
有关更多信息,请参阅 https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/ and https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
TL;DR,我的决定基于:
- 如果有足够多的分区可以并行 运行 而不会阻塞您的任务,请设置并发性 属性。
- 如果没有足够的分区,将您的任务排入队列并 运行 使用线程池。
我有一个消费kafka消息并触发一个长进程的服务。处理一条消息可能需要 10 分钟以上。当前在“doSomething()”方法完成之前不会消耗新消息。如何使它成为并发处理并并行处理消息?
@Service
public class MyService {
@KafkaListener(topics = "request-topic", groupId = "group_id"),
containerFactory = "requirementsKafkaListenerFactory")
private void consumeKafkaRequirementsDataJson(KafkaRequirementsData kafkaRequirementsData) {
System.out.println("Consumed JSON Message from kafka Topic: " + kafkaRequirementsData);
doSomething(kafkaRequirementsData);
}
您可以为此设置并发性 属性,但每个消费者的线程数量受限于该主题的分区数量。如果您已经有相当数量的分区,这可能是可行的方法,但我不会仅仅为了长任务的并行性而增加分区的数量。
您可以尝试在您的应用程序上实现多线程,但管理资源将由您负责,此外还要考虑到一旦一条消息被使用并且线程被启动,您将负责其管理(包括出现问题时的重试)。
有关更多信息,请参阅 https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/ and https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
TL;DR,我的决定基于:
- 如果有足够多的分区可以并行 运行 而不会阻塞您的任务,请设置并发性 属性。
- 如果没有足够的分区,将您的任务排入队列并 运行 使用线程池。