Spring Kafka ConsumerConfig 错误地列出了 value.deserializer 的 StringDeserializer 而不是 KafkaAvroDeserializer
Spring Kafka ConsumerConfig incorrectly lists StringDeserializer instead of KafkaAvroDeserializer for value.deserializer
当使用 spring-kafa
启动 Spring 引导 2.4.5
时,value.deserializer
值显示为 StringDeserializer
而不是 KafkaAvroDeserializer
:
2021-05-12 13:46:05.313 INFO 12632 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
... Elided for brevity ...
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
我的配置:
@Bean
public Map<String, User> consumerConfigAvro() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dnk23");
props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:18081");
props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );
props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
return props;
}
@Bean
public ConsumerFactory<String, User> consumerFactoryAvro() {
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(consumerConfigAvro(), false);
return new DefaultKafkaConsumerFactory(consumerConfigAvro(),
new StringDeserializer(),
kafkaAvroDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactoryAvro() {
ConcurrentKafkaListenerContainerFactory<String, User> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryAvro());
return factory;
}
我 运行 进入以下异常,但在花了很多时间后,我认为根本原因是 value.deserializer
设置不正确。
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.sandbox.SandboxApplication.listen3(com.example.sandbox.avro.User)]
Bean [com.example.sandbox.SandboxApplication$$EnhancerBySpringCGLIB$af3650b@5afcde28]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}], failedMessage=GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}], failedMessage=GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2114) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2001) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}], failedMessage=GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:341) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.6.7.jar:2.6.7]
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:926) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:330) ~[spring-kafka-2.6.7.jar:2.6.7]
... 13 common frames omitted
监听器
@KafkaListener(id = "myId3", topics = "topic3")
public void listen3(@Payload User rec) {
System.out.println(rec.getName());
}
制作人
User user = User.newBuilder()
.setName("Henry Green Engine")
.setNumber(count)
.build();
Message<User> message3 = MessageBuilder
.withPayload(user)
.setHeader(KafkaHeaders.TOPIC, "topic3")
.setHeader(KafkaHeaders.MESSAGE_KEY, "some-key")
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader("X-APP-EVENT", "ApplicationCreatedEvent")
.build();
kafkaTemplate3.send(message3);
Avro 架构
{
"namespace": "com.example.sandbox.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "number", "type": "int"}
]
}
当使用非标准容器工厂 bean 名称(默认为 kafkaListenerContainerFactory
)时,您必须在 @KafkaListener
.
上指定工厂 bean 名称
public @interface KafkaListener {
...
/**
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the container factory bean name.
*/
String containerFactory() default "";
所以..., containerFactory="kafkaListenerContainerFactoryAvro", ...)
当使用 spring-kafa
启动 Spring 引导 2.4.5
时,value.deserializer
值显示为 StringDeserializer
而不是 KafkaAvroDeserializer
:
2021-05-12 13:46:05.313 INFO 12632 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
... Elided for brevity ...
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
我的配置:
@Bean
public Map<String, User> consumerConfigAvro() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dnk23");
props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:18081");
props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );
props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
return props;
}
@Bean
public ConsumerFactory<String, User> consumerFactoryAvro() {
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(consumerConfigAvro(), false);
return new DefaultKafkaConsumerFactory(consumerConfigAvro(),
new StringDeserializer(),
kafkaAvroDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactoryAvro() {
ConcurrentKafkaListenerContainerFactory<String, User> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryAvro());
return factory;
}
我 运行 进入以下异常,但在花了很多时间后,我认为根本原因是 value.deserializer
设置不正确。
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.sandbox.SandboxApplication.listen3(com.example.sandbox.avro.User)]
Bean [com.example.sandbox.SandboxApplication$$EnhancerBySpringCGLIB$af3650b@5afcde28]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}], failedMessage=GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}], failedMessage=GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2114) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2001) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}], failedMessage=GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:341) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.6.7.jar:2.6.7]
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.sandbox.avro.User] for GenericMessage [payload= $Henry Green Engine, headers={kafka_offset=18, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@259a8378, kafka_timestampType=CREATE_TIME, X-APP-EVENT=ApplicationCreatedEvent, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some-key, kafka_receivedTopic=topic3, kafka_receivedTimestamp=1620841566436, kafka_groupId=myId3}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:926) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.3.6.jar:5.3.6]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:330) ~[spring-kafka-2.6.7.jar:2.6.7]
... 13 common frames omitted
监听器
@KafkaListener(id = "myId3", topics = "topic3")
public void listen3(@Payload User rec) {
System.out.println(rec.getName());
}
制作人
User user = User.newBuilder()
.setName("Henry Green Engine")
.setNumber(count)
.build();
Message<User> message3 = MessageBuilder
.withPayload(user)
.setHeader(KafkaHeaders.TOPIC, "topic3")
.setHeader(KafkaHeaders.MESSAGE_KEY, "some-key")
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader("X-APP-EVENT", "ApplicationCreatedEvent")
.build();
kafkaTemplate3.send(message3);
Avro 架构
{
"namespace": "com.example.sandbox.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "number", "type": "int"}
]
}
当使用非标准容器工厂 bean 名称(默认为 kafkaListenerContainerFactory
)时,您必须在 @KafkaListener
.
public @interface KafkaListener {
...
/**
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the container factory bean name.
*/
String containerFactory() default "";
所以..., containerFactory="kafkaListenerContainerFactoryAvro", ...)