Kafka-Message 的自定义 header 中允许的包
Allowed packages in custom header of Kafka-Message
在 spring-kafka 中,如何从包中添加 类 作为自定义 header 字段进行信任?
消息是这样发送的:
@Autowired
private KafkaTemplate kafkaTemplate;
Message<BouquetMQDTO> m = MessageBuilder
.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, "topic")
.setHeader("EVENT_TYPE", MessageType.UPSERT)
.build();
kafkaTemplate.send(m);
接收端是这样的:
@Component
@KafkaListener(topics = "topic")
public class KafkaController {
@KafkaHandler
public void listen(
@Payload Object objectDTO,
@Header(value = "EVENT_TYPE") MessageType messageType
) {
System.out.println(messageType);
}
}
我不断收到的异常是:
Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType]
MessageType 是一个枚举,我可以通过发送字符串表示形式并在接收端使用 valueOf() 来让它工作,但这个解决方案感觉不太对。还有大量教程使用了 java.utils 中的内容,默认情况下是受信任的。
我发现您应该能够声明一个 bean 以允许反序列化枚举:
@Bean
public KafkaHeaderMapper defaultKafkaHeaderMapper() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.addTrustedPackages("my.package");
return mapper;
}
遗憾的是,这不起作用。例外情况保持不变。我假设我必须声明更多的 bean 并在其中使用 KafkaHeaderMapper bean,但我似乎无法找出它们是哪些。
我也已经有一个 ConsumerFactory bean,我允许将包用作有效负载,但允许包中的枚举也没有做任何事情。
props.put(JsonDeserializer.TRUSTED_PACKAGES, "my.other.package,my.package");
return new DefaultKafkaConsumerFactory<>(props);
JsonDeserializer.TRUSTED_PACKAGES
与 header 完全无关。
这个可以处理`ConsumerRecord的key
或value
。 header 映射器发生在不同的地方。
不确定您是否使用 Spring 启动,但是有一个 MessagingMessageListenerAdapter
带有默认值 MessagingMessageConverter
,因此默认值 DefaultKafkaHeaderMapper
。要为您自己的 HeaderMapper
自定义,您需要将 MessagingMessageConverter
打包,引用 HeaderMapper
并将该转换器注入 AbstractKafkaListenerContainerFactory
bean。
如果你处理Spring Boot,只要声明MessagingMessageConverter
就够了,它将auto-configured变成一个由框架创建的AbstractKafkaListenerContainerFactory
.
这样您就可以访问您信任的包。但是我认为 3it 还不能工作,因为 enum
默认情况下不是 JSON-friendly:https://www.baeldung.com/jackson-serialize-enums
在 spring-kafka 中,如何从包中添加 类 作为自定义 header 字段进行信任?
消息是这样发送的:
@Autowired
private KafkaTemplate kafkaTemplate;
Message<BouquetMQDTO> m = MessageBuilder
.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, "topic")
.setHeader("EVENT_TYPE", MessageType.UPSERT)
.build();
kafkaTemplate.send(m);
接收端是这样的:
@Component
@KafkaListener(topics = "topic")
public class KafkaController {
@KafkaHandler
public void listen(
@Payload Object objectDTO,
@Header(value = "EVENT_TYPE") MessageType messageType
) {
System.out.println(messageType);
}
}
我不断收到的异常是:
Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType]
MessageType 是一个枚举,我可以通过发送字符串表示形式并在接收端使用 valueOf() 来让它工作,但这个解决方案感觉不太对。还有大量教程使用了 java.utils 中的内容,默认情况下是受信任的。
我发现您应该能够声明一个 bean 以允许反序列化枚举:
@Bean
public KafkaHeaderMapper defaultKafkaHeaderMapper() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.addTrustedPackages("my.package");
return mapper;
}
遗憾的是,这不起作用。例外情况保持不变。我假设我必须声明更多的 bean 并在其中使用 KafkaHeaderMapper bean,但我似乎无法找出它们是哪些。 我也已经有一个 ConsumerFactory bean,我允许将包用作有效负载,但允许包中的枚举也没有做任何事情。
props.put(JsonDeserializer.TRUSTED_PACKAGES, "my.other.package,my.package");
return new DefaultKafkaConsumerFactory<>(props);
JsonDeserializer.TRUSTED_PACKAGES
与 header 完全无关。
这个可以处理`ConsumerRecord的key
或value
。 header 映射器发生在不同的地方。
不确定您是否使用 Spring 启动,但是有一个 MessagingMessageListenerAdapter
带有默认值 MessagingMessageConverter
,因此默认值 DefaultKafkaHeaderMapper
。要为您自己的 HeaderMapper
自定义,您需要将 MessagingMessageConverter
打包,引用 HeaderMapper
并将该转换器注入 AbstractKafkaListenerContainerFactory
bean。
如果你处理Spring Boot,只要声明MessagingMessageConverter
就够了,它将auto-configured变成一个由框架创建的AbstractKafkaListenerContainerFactory
.
这样您就可以访问您信任的包。但是我认为 3it 还不能工作,因为 enum
默认情况下不是 JSON-friendly:https://www.baeldung.com/jackson-serialize-enums