Spring-kafka监听并发
Spring-kafka listener concurenncy
我已经使用 spring-kafka lib 实现了 Kafka 消费者。
我有一个带有 2 个分区的 Kafka 主题,我还使用 ConcurrentKafkaListenerContainerFactory
并将并发级别设置为 2,因此每个容器实例都应该根据 spring-kafka documentation 从单个分区中使用。
The KafkaMessageListenerContainer receives all message from all
topics/partitions on a single thread. The
ConcurrentMessageListenerContainer delegates to 1 or more
KafkaMessageListenerContainer s to provide multi-threaded consumption.
有我的消费者class:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
如您所见,我有 'hashMap' 集合,其中我根据消息 'customer_id' 属性将我的事件放入相应的队列。
这种功能需要在多线程访问的情况下进行额外的同步,正如我所见spring-kafka 只为所有容器创建一个 bean 实例,而不是为每个容器创建一个单独的 bean 实例,以避免并发问题。
如何以编程方式更改此逻辑?
我看到解决此问题的唯一奇怪方法是使用两个 JVM 运行 一个单独的应用程序,其中包含单线程消费者,因此可以使用 #receive 方法访问 KafkaConsumer class将是单线程的。
没错。这就是它的工作原理。该框架实际上不依赖于 bean,而仅依赖于它向函数传递消息的方法。
您可以考虑为主题中的每个分区设置两个 @KafkaListener
方法。一个分区中的记录确实在单个线程中传送到 @KafkaListener
。所以,如果你真的不能忍受那种状态,你可以为每个线程使用两个 HashMap
。
侦听器抽象背后的总体思想正是关于 stateless 行为。 KafkaConsumer
是常规的 Spring singleton bean。你必须接受这个事实并根据这种情况重新设计你的解决方案。
我已经使用 spring-kafka lib 实现了 Kafka 消费者。
我有一个带有 2 个分区的 Kafka 主题,我还使用 ConcurrentKafkaListenerContainerFactory
并将并发级别设置为 2,因此每个容器实例都应该根据 spring-kafka documentation 从单个分区中使用。
The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.
有我的消费者class:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
如您所见,我有 'hashMap' 集合,其中我根据消息 'customer_id' 属性将我的事件放入相应的队列。 这种功能需要在多线程访问的情况下进行额外的同步,正如我所见spring-kafka 只为所有容器创建一个 bean 实例,而不是为每个容器创建一个单独的 bean 实例,以避免并发问题。
如何以编程方式更改此逻辑?
我看到解决此问题的唯一奇怪方法是使用两个 JVM 运行 一个单独的应用程序,其中包含单线程消费者,因此可以使用 #receive 方法访问 KafkaConsumer class将是单线程的。
没错。这就是它的工作原理。该框架实际上不依赖于 bean,而仅依赖于它向函数传递消息的方法。
您可以考虑为主题中的每个分区设置两个 @KafkaListener
方法。一个分区中的记录确实在单个线程中传送到 @KafkaListener
。所以,如果你真的不能忍受那种状态,你可以为每个线程使用两个 HashMap
。
侦听器抽象背后的总体思想正是关于 stateless 行为。 KafkaConsumer
是常规的 Spring singleton bean。你必须接受这个事实并根据这种情况重新设计你的解决方案。