Kafka-Message 的自定义 header 中允许的包

Allowed packages in custom header of Kafka-Message

在 spring-kafka 中,如何从包中添加 类 作为自定义 header 字段进行信任?

消息是这样发送的:

@Autowired
private KafkaTemplate kafkaTemplate;

Message<BouquetMQDTO> m = MessageBuilder
            .withPayload(payload)
            .setHeader(KafkaHeaders.TOPIC, "topic")
            .setHeader("EVENT_TYPE", MessageType.UPSERT)
            .build();
kafkaTemplate.send(m);

接收端是这样的:

@Component
@KafkaListener(topics = "topic")
public class KafkaController {

    @KafkaHandler
    public void listen(
        @Payload Object objectDTO,
        @Header(value = "EVENT_TYPE") MessageType messageType
    ) { 
        System.out.println(messageType);
    }
}

我不断收到的异常是:

Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType]

MessageType 是一个枚举,我可以通过发送字符串表示形式并在接收端使用 valueOf() 来让它工作,但这个解决方案感觉不太对。还有大量教程使用了 java.utils 中的内容,默认情况下是受信任的。

我发现您应该能够声明一个 bean 以允许反序列化枚举:

@Bean
public KafkaHeaderMapper defaultKafkaHeaderMapper() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.addTrustedPackages("my.package");
    return mapper;
}

遗憾的是,这不起作用。例外情况保持不变。我假设我必须声明更多的 bean 并在其中使用 KafkaHeaderMapper bean,但我似乎无法找出它们是哪些。 我也已经有一个 ConsumerFactory bean,我允许将包用作有效负载,但允许包中的枚举也没有做任何事情。

props.put(JsonDeserializer.TRUSTED_PACKAGES, "my.other.package,my.package");
return new DefaultKafkaConsumerFactory<>(props);

JsonDeserializer.TRUSTED_PACKAGES 与 header 完全无关。 这个可以处理`ConsumerRecord的keyvalue。 header 映射器发生在不同的地方。

不确定您是否使用 Spring 启动,但是有一个 MessagingMessageListenerAdapter 带有默认值 MessagingMessageConverter,因此默认值 DefaultKafkaHeaderMapper。要为您自己的 HeaderMapper 自定义,您需要将 MessagingMessageConverter 打包,引用 HeaderMapper 并将该转换器注入 AbstractKafkaListenerContainerFactory bean。

如果你处理Spring Boot,只要声明MessagingMessageConverter就够了,它将auto-configured变成一个由框架创建的AbstractKafkaListenerContainerFactory .

这样您就可以访问您信任的包。但是我认为 3it 还不能工作,因为 enum 默认情况下不是 JSON-friendly:https://www.baeldung.com/jackson-serialize-enums