Spring-kafka监听并发

Spring-kafka listener concurenncy

我已经使用 spring-kafka lib 实现了 Kafka 消费者。 我有一个带有 2 个分区的 Kafka 主题,我还使用 ConcurrentKafkaListenerContainerFactory 并将并发级别设置为 2,因此每个容器实例都应该根据 spring-kafka documentation 从单个分区中使用。

The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.

有我的消费者class:

@Component
public class KafkaConsumer {
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
        String message = record.value().toString();
        Event event = EventFactory.createEvent(message);
        String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
        // add event to hashMap
        LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
        if (queue == null) {
            queue = new LinkedBlockingQueue<>();
            queue.add(event);
            hashMap.put(customerId, queue);
        } else {
            queue.add(event);
        }
    }
}

如您所见,我有 'hashMap' 集合,其中我根据消息 'customer_id' 属性将我的事件放入相应的队列。 这种功能需要在多线程访问的情况下进行额外的同步,正如我所见spring-kafka 只为所有容器创建一个 bean 实例,而不是为每个容器创建一个单独的 bean 实例,以避免并发问题。

如何以编程方式更改此逻辑?

我看到解决此问题的唯一奇怪方法是使用两个 JVM 运行 一个单独的应用程序,其中包含单线程消费者,因此可以使用 #receive 方法访问 KafkaConsumer class将是单线程的。

没错。这就是它的工作原理。该框架实际上不依赖于 bean,而仅依赖于它向函数传递消息的方法。

您可以考虑为主题中的每个分区设置两个 @KafkaListener 方法。一个分区中的记录确实在单个线程中传送到 @KafkaListener。所以,如果你真的不能忍受那种状态,你可以为每个线程使用两个 HashMap

侦听器抽象背后的总体思想正是关于 stateless 行为。 KafkaConsumer 是常规的 Spring singleton bean。你必须接受这个事实并根据这种情况重新设计你的解决方案。