Kafka 反应器 - 如何禁用自动启动的 KAFKA 消费者?
Kafka reactor - How to disable KAFKA consumer being autostarted?
下面是我的KAFKA消费者
@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
receiverOptions.schedulerSupplier(() -> Schedulers
.fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(
kafkaProperties.getKafka()
.getCore().getInbound().getConfluent()
.getTopicName()))
.commitInterval(Duration.ZERO).commitBatchSize(0));
}
我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 消费者。
我了解到,在 spring KAFKA 中我们可以做这样的事情
factory.setAutoStartup(start);
但是,我不确定如何在 Kafka 反应器中实现(控制自动 start/stop 行为)。我想要像下面这样的东西
引入 属性 来处理自动 start/stop 行为
@Value("${consumer.autostart:true}")
private boolean start;
使用上面的 属性 我应该能够在 Kafka 反应器中设置 KAFKA 自动启动标志,像这样
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(
kafkaProperties.getKafka()
.getCore().getInbound().getConfluent()
.getTopicName()))
.commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);
注:.setAutoStart(start);
这在 Kafka 反应器中是否可行,如果可行,我该怎么做?
更新:
protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
kafkaEventHubInboundReceiver
.receive()
.publishOn(scheduler)
.groupBy(receiverRecord -> {
try {
return receiverRecord.receiverOffset().topicPartition();
} catch (Throwable throwable) {
log.error("exception in groupby", throwable);
return Flux.empty();
}
}).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
.map(record -> {
processMessage(record, topicName, allowedValues).block(
Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
return record;
}).concatMap(message -> {
log.info("Received message after processing offset: {} partition: {} ",
message.offset(), message.partition());
return message.receiverOffset()
.commit()
.onErrorContinue((t, o) -> log.error(
String.format("exception raised while commit offset %s", o), t)
);
})).onErrorContinue((t, o) -> {
try {
if (null != o) {
ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
ReceiverOffset offset = record.receiverOffset();
log.debug("failed to process message: {} partition: {} and message: {} ",
offset.offset(), record.partition(), record.value());
}
log.error(String.format("exception raised while processing message %s", o), t);
} catch (Throwable inner) {
log.error("encountered error in onErrorContinue", inner);
}
}).subscribeOn(scheduler).subscribe();
我可以做这样的事情吗?
kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}
reactor-kafka
没有“自动启动”的概念;你完全掌控。
在您订阅从 receiver.receive()
返回的 Flux
之前,消费者不会“启动”。
只需延迟 flux.subscribe()
,直到您准备好使用数据。
下面是我的KAFKA消费者
@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
receiverOptions.schedulerSupplier(() -> Schedulers
.fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(
kafkaProperties.getKafka()
.getCore().getInbound().getConfluent()
.getTopicName()))
.commitInterval(Duration.ZERO).commitBatchSize(0));
}
我的 KAFKA 消费者正在自动启动。但是我想禁用自动启动的 KAFKA 消费者。
我了解到,在 spring KAFKA 中我们可以做这样的事情
factory.setAutoStartup(start);
但是,我不确定如何在 Kafka 反应器中实现(控制自动 start/stop 行为)。我想要像下面这样的东西
引入 属性 来处理自动 start/stop 行为
@Value("${consumer.autostart:true}")
private boolean start;
使用上面的 属性 我应该能够在 Kafka 反应器中设置 KAFKA 自动启动标志,像这样
return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
.subscription(Collections.singleton(
kafkaProperties.getKafka()
.getCore().getInbound().getConfluent()
.getTopicName()))
.commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);
注:.setAutoStart(start);
这在 Kafka 反应器中是否可行,如果可行,我该怎么做?
更新:
protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
kafkaEventHubInboundReceiver
.receive()
.publishOn(scheduler)
.groupBy(receiverRecord -> {
try {
return receiverRecord.receiverOffset().topicPartition();
} catch (Throwable throwable) {
log.error("exception in groupby", throwable);
return Flux.empty();
}
}).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
.map(record -> {
processMessage(record, topicName, allowedValues).block(
Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
return record;
}).concatMap(message -> {
log.info("Received message after processing offset: {} partition: {} ",
message.offset(), message.partition());
return message.receiverOffset()
.commit()
.onErrorContinue((t, o) -> log.error(
String.format("exception raised while commit offset %s", o), t)
);
})).onErrorContinue((t, o) -> {
try {
if (null != o) {
ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
ReceiverOffset offset = record.receiverOffset();
log.debug("failed to process message: {} partition: {} and message: {} ",
offset.offset(), record.partition(), record.value());
}
log.error(String.format("exception raised while processing message %s", o), t);
} catch (Throwable inner) {
log.error("encountered error in onErrorContinue", inner);
}
}).subscribeOn(scheduler).subscribe();
我可以做这样的事情吗?
kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}
reactor-kafka
没有“自动启动”的概念;你完全掌控。
在您订阅从 receiver.receive()
返回的 Flux
之前,消费者不会“启动”。
只需延迟 flux.subscribe()
,直到您准备好使用数据。