Spring-Kafka 中的 poll() 和 commitAsync()
poll() and commitAsync() in Spring-Kafka
我正在尝试在 java 中编写一个 kafka 消费者应用程序
Spring引导平台。早些时候,我用普通 java 编写了代码,但是
现在转换为 spring-kafka,因为它可以提供一些优势
普通 java。我确实有几个问题想弄明白。
看来我不必在spring-kafka中明确地进行poll()循环
它将由@KafkaListener 自动处理?
我设置了enable.auto.commit='false',因为我要做一些
在提交偏移量之前进行处理,我怎样才能在中执行 commitAsync()
Spring-卡夫卡?
ConsumerConfig.java :
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${app.kafka_brokers}")
private String KAFKA_BROKERS;
@Value("${app.topic}")
private String KAFKA_TOPIC;
@Value("${app.group_id_config}")
private String GROUP_ID_CONFIG;
@Value("${app.schema_registry_url}")
private String SCHEMA_REGISTRY_URL;
@Value("${app.offset_reset}")
private String OFFSET_RESET;
@Value("${app.max_poll_records}")
private String MAX_POLL_RECORDS;
@Value("${app.security.protocol}")
private String SSL_PROTOCOL;
@Value("${app.ssl.truststore.password}")
private String SSL_TRUSTSTORE_SECURE;
@Value("${app.ssl.keystore.password}")
private String SSL_KEYSTORE_SECURE;
@Value("${app.ssl.key.password}")
private String SSL_KEY_SECURE;
@Value("${app.ssl.truststore.location}")
private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;
@Value("${app.ssl.keystore.location}")
private String SSL_KEYSTORE_LOCATION_FILE_NAME;
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
KafkaConsumer.java :
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {
System.out.println(record);
<-- how to asyncCommit()-->
}
}
监听器容器会在监听器正常退出时提交偏移量,具体取决于容器的AckMode
属性; AckMode.BATCH
(默认)表示轮询返回的所有记录的偏移量将在所有处理完成后提交,AckMode.RECORD
表示每个偏移量将在侦听器退出后立即提交。
sync
比。 async
由 syncCommits
容器控制 属性。
首先,我建议您使用 Spring kafka 设置的属性和自动配置,而不是创建自己的,因为它遵循 DRY 原则:不要重复自己。
spring:
kafka:
bootstrap-servers: ${app.kafka_brokers}
consumer:
auto-offset-reset: ${app.offset_reset}
enable-auto-commit: false // <---- disable auto committing
ssl:
protocol: ${app.security.protocol}
key-store-location: ${app.ssl.keystore.location}
key-store-password: ${app.ssl.keystore.password}
trust-store-location: ${app.ssl.truststore.location}
trust-store-password: ${app.ssl.truststore.password}
// And other properties
listener:
ack-mode: manual // This is what you need
本质上,manual
是一个异步确认,而manual_immediate
是同步的。
然后在你的 @KafkaListener
组件中你可以注入 org.springframework.kafka.support.Acknowledgment
对象确认你的消息。
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
System.out.println(record);
acknowledgment.acknowledge();
}
}
这是关于可以注入到 @KafkaListener
方法中的内容的文档:https://docs.spring.io/spring-kafka/reference/html/#message-listeners
我正在尝试在 java 中编写一个 kafka 消费者应用程序 Spring引导平台。早些时候,我用普通 java 编写了代码,但是 现在转换为 spring-kafka,因为它可以提供一些优势 普通 java。我确实有几个问题想弄明白。
看来我不必在spring-kafka中明确地进行poll()循环 它将由@KafkaListener 自动处理?
我设置了enable.auto.commit='false',因为我要做一些 在提交偏移量之前进行处理,我怎样才能在中执行 commitAsync() Spring-卡夫卡?
ConsumerConfig.java :
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${app.kafka_brokers}") private String KAFKA_BROKERS; @Value("${app.topic}") private String KAFKA_TOPIC; @Value("${app.group_id_config}") private String GROUP_ID_CONFIG; @Value("${app.schema_registry_url}") private String SCHEMA_REGISTRY_URL; @Value("${app.offset_reset}") private String OFFSET_RESET; @Value("${app.max_poll_records}") private String MAX_POLL_RECORDS; @Value("${app.security.protocol}") private String SSL_PROTOCOL; @Value("${app.ssl.truststore.password}") private String SSL_TRUSTSTORE_SECURE; @Value("${app.ssl.keystore.password}") private String SSL_KEYSTORE_SECURE; @Value("${app.ssl.key.password}") private String SSL_KEY_SECURE; @Value("${app.ssl.truststore.location}") private String SSL_TRUSTSTORE_LOCATION_FILE_NAME; @Value("${app.ssl.keystore.location}") private String SSL_KEYSTORE_LOCATION_FILE_NAME; @Bean public ConsumerFactory<String, String> consumerFactory(){ Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG); props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE); props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE); return new DefaultKafkaConsumerFactory<>(props); } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); return factory; } }
KafkaConsumer.java :
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {
System.out.println(record);
<-- how to asyncCommit()-->
}
}
监听器容器会在监听器正常退出时提交偏移量,具体取决于容器的AckMode
属性; AckMode.BATCH
(默认)表示轮询返回的所有记录的偏移量将在所有处理完成后提交,AckMode.RECORD
表示每个偏移量将在侦听器退出后立即提交。
sync
比。 async
由 syncCommits
容器控制 属性。
首先,我建议您使用 Spring kafka 设置的属性和自动配置,而不是创建自己的,因为它遵循 DRY 原则:不要重复自己。
spring:
kafka:
bootstrap-servers: ${app.kafka_brokers}
consumer:
auto-offset-reset: ${app.offset_reset}
enable-auto-commit: false // <---- disable auto committing
ssl:
protocol: ${app.security.protocol}
key-store-location: ${app.ssl.keystore.location}
key-store-password: ${app.ssl.keystore.password}
trust-store-location: ${app.ssl.truststore.location}
trust-store-password: ${app.ssl.truststore.password}
// And other properties
listener:
ack-mode: manual // This is what you need
本质上,manual
是一个异步确认,而manual_immediate
是同步的。
然后在你的 @KafkaListener
组件中你可以注入 org.springframework.kafka.support.Acknowledgment
对象确认你的消息。
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
System.out.println(record);
acknowledgment.acknowledge();
}
}
这是关于可以注入到 @KafkaListener
方法中的内容的文档:https://docs.spring.io/spring-kafka/reference/html/#message-listeners