具有多个回复模板的 Kafka 侦听器容器

Kafka listener container with multiple reply templates

我有一个带有两个已注册处理程序的 Kafka 侦听器,每个处理程序都在侦听关于同一主题但具有不同模式类型的消息。侦听器使用 @SendTo 注释将结果转发到另一个主题,EOS 由 @Transactional.

启用
@KafkaListener(
        groupId = "groupId",
        clientIdPrefix = "kafka-async-api-commands-listener",
        topics = "cmd-topic",
        containerFactory = "kafkaAsyncApiCommandsListenerContainerFactory",
        errorHandler = "kafkaAsyncApiErrorHandler"
)
public class KafkaAsyncApiCommandsListener {

    @KafkaHandler
    @Transactional
    @SendTo("resp-topic")
    Message<FooResponse> Foo(FooCommand command) {
        FooResponse response = new FooResponse(command);
        return MessageBuilder
                .withPayload(response)
                .build();
    }

    @KafkaHandler
    @Transactional
    @SendTo("resp-topic")
    Message<BarResponse> calculateBar(BarCommand command) {
        BarResponse response = new BarResponse(command);
        return MessageBuilder
                .withPayload(response)
                .build();
    }
}

根据 docs:

In order to support @SendTo, the listener container factory must be provided with a KafkaTemplate (in its replyTemplate property), which is used to send the reply.

KafkaTemplate 是一种参数化类型,需要提供将要生成的消息的键和值。我在想出同时支持 FooResponseBarResponse 作为消息类型的模板时遇到问题。而且这似乎是必须的,因为容器工厂只接受一个模板。

由于两个模板都需要共享整个配置基础(属性、错误处理程序),我可以实例化一个类型为 KafkaTemplate<UUID, Object> 的模板,该模板能够生成具有 FooResponse 和 [=16= 的消息] 序列化body并注入容器工厂:

@Configuration
public class KafkaAsyncApiProducerTemplatesConfig {

    private final String bootstrapServers;

    public KafkaAsyncApiProducerTemplatesConfig(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    @Bean("asyncApiResponseTemplate")
    public KafkaTemplate<UUID, Object> asyncApiResponseKafkaTemplate() {
        return new KafkaTemplate<>(kafkaAsyncApiProducerFactory());
    }

    private ProducerFactory<UUID, Object> kafkaAsyncApiProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
}
@Component
public class KafkaAsyncApiCommandsListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<UUID, CalculateCustomerBalanceCommand> {

    private final String bootstrapServers;
    private final KafkaTemplate<UUID, Object> kafkaTemplate;

    public KafkaAsyncApiCommandsListenerContainerFactory(
            @Value("${kafka.bootstrap-servers}") String bootstrapServers,
            @Qualifier("asyncApiResponseTemplate") KafkaTemplate<UUID, Object> kafkaTemplate
    ) {
        super();
        this.bootstrapServers = bootstrapServers;
        this.kafkaTemplate = kafkaTemplate;
        this.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
        this.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000, 3)));
        this.setReplyTemplate(kafkaTemplate);
    }

    private Map<String, Object> consumerConfig() {
        return new HashMap<String, Object>() {{
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class);
            put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
            put(JsonDeserializer.TRUSTED_PACKAGES, "*");
            put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        }};
    }
}

我对我的方法不满意,是否是因为 hacky 通过使用 Object 作为绕过 KafkaTemplate 静态类型的方法value 参数或由于将来可能出现的潜在配置不灵活。假设我想为 FooResponseBarResponse 消息提供截然不同的配置模板。差异将需要不同的生产者配置。使用具有包罗万象的 Object 值类型的单个模板,这是不可能实现的。

有什么方法可以提供多个回复模板给监听器容器,让监听器容器根据消息值类型动态选择? Spring 引导自动配置是否试图以某种方式解决这种用例?我不能在我的项目中使用它,但不介意它的代码中的任何提示。也许只能通过实例化两个单独的侦听器容器(附加不同的回复模板)侦听同一主题来满足此要求?如果后一种方法是正确的,我如何确保跨多个侦听器容器正确传递消息(最好使用恰好一次语义)?

Is there any way to provide more than one replying templates to the listener container to be chosen dynamically based on message value type?

没有

Does Spring Boot auto-configuration somehow attempt to address this kind of use case?

没有

然而,当 return 类型为 Message<?> 时,通用参数无关紧要。参见

    /**
     * Send a message with routing information in message headers. The message payload
     * may be converted before sending.
     * @param message the message to send.
     * @return a Future for the {@link SendResult}.
     * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
     * @see org.springframework.kafka.support.KafkaHeaders#PARTITION
     * @see org.springframework.kafka.support.KafkaHeaders#KEY
     */
    ListenableFuture<SendResult<K, V>> send(Message<?> message);

泛型类型仅在使用采用键 and/or 值的发送方法时才有意义。

您可以创建 KafkaTemplate 的子类并重写发送方法以调用具有适当泛型类型的委托模板,但这将毫无意义,除非您 return FooResponse 或直接 BarResponse,而不是将它们组装成 Message<?>.