不确定如何激活 Kafka 消费者 consume() 方法
Unsure how to active a Kafka consumer consume() method
编辑:根本原因是指 属性 路径不存在。 Consumer.java class 现在有这个并且正在工作:
@ConditionalOnProperty(value = "aaronshaver.kafka.consumer-enabled", havingValue = "true")
我有一个简单的 Kafka 设置,其中有一个绝对在生产的生产者。我用这段代码确认了这一点:
SendMessageTask.java
ListenableFuture<SendResult<String, String>> listenableFuture = this.producer.sendMessage("INPUT_DATA", "IN_KEY", LocalDate.now().toString());
SendResult<String, String> result = listenableFuture.get();
logger.info(String.format("\nProduced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d\n", result.getRecordMetadata().topic(), result.getRecordMetadata().offset(), result.getRecordMetadata().partition(), result.getRecordMetadata().serializedValueSize()));
消息显示在控制台中,其中包含发送结果的内容。
我不明白消费者的 Kafka 侦听器方法是如何被调用的/为什么它没有被调用。这是消费者(我尝试了几种不同的方法):
Consumer.java
@Service
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "INPUT_DATA", groupId = "fooGroup")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group fooGroup: " + message);
}
@KafkaListener(topics = {"INPUT_DATA"})
public void consume(
final @Payload String message,
final @Header(KafkaHeaders.OFFSET) Integer offset,
final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
final Acknowledgment acknowledgment
) {
logger.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s", ts, message, offset, key, partition, topic));
acknowledgment.acknowledge();
}
我试过这个配置文件:
*KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"fooGroup");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我觉得应该调用 @KafkaListener 注释方法,但我可能误解了它的工作原理,或者配置有误。
为了更好地衡量,这里是设置了一些 Spring 和 Kafka 属性的文件:
main/resources/application.yml
aaronshaver:
kafka:
consumer-enabled: ${consumer-enabled:true}
spring:
kafka:
bootstrap-servers: ${kafka_bootstrap_servers:localhost:29092}
properties:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username=${kafka_username:'admin'} password=${kafka_password:'admin-secret'};
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
consumer:
auto-offset-reset: earliest
group-id: aaronshaver
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1
fetch-max-wait: 36000
enable-auto-commit: false
client-id: aaronshaver
producer:
client-id: aaronshaver
key-serializer: org.apache.kafka.common.serializatoin.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 2
jaas:
enabled: true
listener:
poll-timeout: 1800000
concurrency: 1
ack-mode: manual_immediate
有什么想法吗?谢谢!
您的问题在这里:
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
而且没有这样的配置属性。
当我删除这个条件时,它开始使用主题中的记录。
请修改您的应用程序中的配置或逻辑。
编辑:根本原因是指 属性 路径不存在。 Consumer.java class 现在有这个并且正在工作:
@ConditionalOnProperty(value = "aaronshaver.kafka.consumer-enabled", havingValue = "true")
我有一个简单的 Kafka 设置,其中有一个绝对在生产的生产者。我用这段代码确认了这一点:
SendMessageTask.java
ListenableFuture<SendResult<String, String>> listenableFuture = this.producer.sendMessage("INPUT_DATA", "IN_KEY", LocalDate.now().toString());
SendResult<String, String> result = listenableFuture.get();
logger.info(String.format("\nProduced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d\n", result.getRecordMetadata().topic(), result.getRecordMetadata().offset(), result.getRecordMetadata().partition(), result.getRecordMetadata().serializedValueSize()));
消息显示在控制台中,其中包含发送结果的内容。
我不明白消费者的 Kafka 侦听器方法是如何被调用的/为什么它没有被调用。这是消费者(我尝试了几种不同的方法):
Consumer.java
@Service
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "INPUT_DATA", groupId = "fooGroup")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group fooGroup: " + message);
}
@KafkaListener(topics = {"INPUT_DATA"})
public void consume(
final @Payload String message,
final @Header(KafkaHeaders.OFFSET) Integer offset,
final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
final Acknowledgment acknowledgment
) {
logger.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s", ts, message, offset, key, partition, topic));
acknowledgment.acknowledge();
}
我试过这个配置文件:
*KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"fooGroup");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我觉得应该调用 @KafkaListener 注释方法,但我可能误解了它的工作原理,或者配置有误。
为了更好地衡量,这里是设置了一些 Spring 和 Kafka 属性的文件:
main/resources/application.yml
aaronshaver:
kafka:
consumer-enabled: ${consumer-enabled:true}
spring:
kafka:
bootstrap-servers: ${kafka_bootstrap_servers:localhost:29092}
properties:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username=${kafka_username:'admin'} password=${kafka_password:'admin-secret'};
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
consumer:
auto-offset-reset: earliest
group-id: aaronshaver
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1
fetch-max-wait: 36000
enable-auto-commit: false
client-id: aaronshaver
producer:
client-id: aaronshaver
key-serializer: org.apache.kafka.common.serializatoin.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 2
jaas:
enabled: true
listener:
poll-timeout: 1800000
concurrency: 1
ack-mode: manual_immediate
有什么想法吗?谢谢!
您的问题在这里:
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
而且没有这样的配置属性。
当我删除这个条件时,它开始使用主题中的记录。
请修改您的应用程序中的配置或逻辑。