Kafka 反应器 - 如何禁用自动启动的 KAFKA 消费者?

Kafka reactor - How to disable KAFKA consumer being autostarted?

下面是我的KAFKA消费者

@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
        matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
    ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
    receiverOptions.schedulerSupplier(() -> Schedulers
            .fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
    receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
    return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
            .subscription(Collections.singleton(
                    kafkaProperties.getKafka()
                            .getCore().getInbound().getConfluent()
                            .getTopicName()))
            .commitInterval(Duration.ZERO).commitBatchSize(0));
}

我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 消费者。

我了解到,在 spring KAFKA 中我们可以做这样的事情

factory.setAutoStartup(start);

但是,我不确定如何在 Kafka 反应器中实现(控制自动 start/stop 行为)。我想要像下面这样的东西

引入 属性 来处理自动 start/stop 行为

@Value("${consumer.autostart:true}")
private boolean start;

使用上面的 属性 我应该能够在 Kafka 反应器中设置 KAFKA 自动启动标志,像这样

return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
        .subscription(Collections.singleton(
                kafkaProperties.getKafka()
                        .getCore().getInbound().getConfluent()
                        .getTopicName()))
        .commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);

注:.setAutoStart(start);

这在 Kafka 反应器中是否可行,如果可行,我该怎么做?

更新:

protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
    Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
    kafkaEventHubInboundReceiver
            .receive()
            .publishOn(scheduler)
            .groupBy(receiverRecord -> {
                try {
                    return receiverRecord.receiverOffset().topicPartition();
                } catch (Throwable throwable) {
                    log.error("exception in groupby", throwable);
                    return Flux.empty();
                }
            }).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
            .map(record -> {
                processMessage(record, topicName, allowedValues).block(
                        Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
                return record;
            }).concatMap(message -> {
                log.info("Received message after processing offset: {} partition: {} ",
                         message.offset(), message.partition());
                return message.receiverOffset()
                        .commit()
                        .onErrorContinue((t, o) -> log.error(
                                String.format("exception raised while commit offset %s", o), t)
                        );
            })).onErrorContinue((t, o) -> {
        try {
            if (null != o) {
                ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
                ReceiverOffset offset = record.receiverOffset();
                log.debug("failed to process message: {} partition: {} and message: {} ",
                          offset.offset(), record.partition(), record.value());
            }
            log.error(String.format("exception raised while processing message %s", o), t);
        } catch (Throwable inner) {
            log.error("encountered error in onErrorContinue", inner);
        }
    }).subscribeOn(scheduler).subscribe();

我可以做这样的事情吗?

kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}

reactor-kafka没有“自动启动”的概念;你完全掌控。

在您订阅从 receiver.receive() 返回的 Flux 之前,消费者不会“启动”。

只需延迟 flux.subscribe(),直到您准备好使用数据。