spring集成kafka监听线程并发=分区数时读取多个分区

spring integration kafka listener thread reads multiple partitions when concurrency = partition count

我设置了一个 Spring 集成流程来处理具有 3 个分区的主题,并将侦听器容器的并发设置为 3。正如预期的那样,我看到三个线程处理来自所有 3 个分区的批处理。但是,我看到在某些情况下,其中一个侦听器线程可能会处理包含来自多个分区的消息的单个批次。我的数据在 kafka 中按一个 id 进行分区,以便它可以与其他 id 并发处理,但不能在另一个线程上使用相同的 id(这是我惊讶地观察到的情况)。我通过阅读文档认为每个线程都会被分配一个分区。我正在使用这样的 KafkaMessageDrivenChannelAdapter:

private static final Class<List<MyEvent>> payloadClass = (Class<List<MyEvent>>)(Class) List.class;

public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, MyEvent> myChannelAdapterSpec() {
        return Kafka.messageDrivenChannelAdapter(tstatEventConsumerFactory(),
                KafkaMessageDrivenChannelAdapter.ListenerMode.batch, "my-topic") //3 partitions
                .configureListenerContainer(c -> {
                    c.ackMode(ContainerProperties.AckMode.BATCH);
                    c.id(_ID);
                    c.concurrency(3);
                    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
                            (record, exception) -> log.error("failed to handle record at offset {}: {}",
                                    record.offset(), record.value(), exception),
                            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
                    );
                    c.errorHandler(errorHandler);
                });
    }
@Bean
public IntegrationFlow myIntegrationFlow() {
        return IntegrationFlows.from(myChannelAdapterSpec())
                .handle(payloadClass, (payload, headers) -> {
                    service.performSink(payload);
                    return null;
                })
                .get();
    }

我该如何设置才能使每个侦听器容器线程仅处理来自一个分区的消息?

But is there additionally a way that I can keep from ever getting a batch with messages from multiple partitions, even if a rebalance does occur?

消费者组不是这样工作的。如果你想拥有一个“粘性”消费者,那么可以考虑使用手动分配。查看基于TopicPartitionOffset... topicPartitions:

的通道适配器工厂
/**
 * Create an initial
 * {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
 * @param consumerFactory the {@link ConsumerFactory}.
 * @param listenerMode the {@link KafkaMessageDrivenChannelAdapter.ListenerMode}.
 * @param topicPartitions the {@link TopicPartitionOffset} vararg.
 * @param <K> the Kafka message key type.
 * @param <V> the Kafka message value type.
 * @return the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
 */
public static <K, V>
KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(
        ConsumerFactory<K, V> consumerFactory,
        KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode,
        TopicPartitionOffset... topicPartitions) {

然后它不会被视为消费者组,您必须创建多个通道适配器,将每个通道适配器指向其特定分区。所有这些通道适配器都可以向相同的 MessageChannel.

发送消息