Spring-kafka大容量处理

Spring-kafka high volume processing

使用 spring-kafka 1.0.5,我正在使用一个繁忙的主题,该主题有 10 个分区,并发性为 10。

我当前的代码根据分区 ID 将消息添加到队列中,这两个 ID 都保存在 HashMap 中。

@KafkaListener(topics = "${kafka.topic}")
public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
    //Pseudo code
    add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition.
}

不幸的是,该设计所花费的处理时间是简单消耗所花费的处理时间的两倍。

我的要求是单独处理分区,但是如何避免使用基于@KafkaListener 的分区引用的哈希映射。

有没有更有效的方法来解决这个问题?理想情况下,侦听器注释中的每个线程都将管理自己的列表。有没有一种方法可以做到这一点而不需要像上面提到的基于分区ID的hashmap这样的交叉引用?

考虑为每个所需的分区声明几个 @KafkaListener 方法。为此,您应该使用 topicPartitions 属性而不是 topics:

/**
 * Used to add topic/partition information to a {@code KafkaListener}.
 *
 */
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicPartition {