如何使用 Spring-Integration Kafka InboundGateway 和 OutboundGateway 与 MessageConverter?

how to use Spring-Integration Kafka InboundGateway and OutboundGateway with a MessageConverter?

  1. 我的目标:

进程#1:
a) 将 Guava ListMultimap 集合发送到 Kafka OutboundGateway,让网关即时将其转换为 Json 字符串。
b) 从进程 #2 接收答案作为 Json 字符串,并让 OutboundGateway 将其即时转换回 ListMultimap

进程#2:
a) 通过 Kafka InboundGateway 从进程 #1 接收 Json 字符串消息,让 InboundGateway 将其即时转换为 ListMultimap b) 将 ListMultimap 应答发送回进程 #1,并让 InboundGateway 将其即时转换回 json 字符串。

  1. 我试过的(伪代码;抱歉,恐怕无法提供完整的源代码):
#process1

RecordMessageConverter converter = new StringJsonMessageConverter(
                                       .JsonMapper.builder()
                                       .addModule(new GuavaModule)
                                       .build());

ReplyingKafkaTemplate<String,Object,Object> template = 
    new ReplyingKafkatemplate<String,Object,Object>(producerFactory,replyListenerContainer);
template.setMessageConverter(converter); //conversion for submission ?

IntegrationFlow flow = 
    IntegrationFlows.from(
        () -> MessageBuilder.withPayload(someListMultimap).build(),
        c -> c.poller(Pollers.cron(cronSchedule)))
        .handle(
            Kafka.outboundGateway(template)
                .replyMessageConverter(converter) //conversion for reception ?
                .topic(gatewayTopic)
        .channel(outputChannel)
        .get();

#process2

RecordMessageConverter converter = new StringJsonMessageConverter(
                                       .JsonMapper.builder()
                                       .addModule(new GuavaModule)
                                       .build());
IntegrationFlows.from(
    Kafka.inboundGateway(consumerFactory,containerProperties, producerFactory)
        .messageConverter(converter) //conversion for reception ?
        .configureTemplate(t -> t.messageConverter(converter))) //conversion for submission ?
    .transform(ListMultimap p -> ListMultimap.of("newkey","newvalue2","newkey","newvalue2"))
    .get()
  1. 我有什么问题:

当消息从进程 #1 发送时,我收到以下错误(抱歉,无法提供完整的调用堆栈):
Caused by: java.lang.ClassCastException: class com.google.common.collect.ImmutableListmap cannot be cast to class java.lang.String (com.google.collect.ImmutableListMultimap is in unnamed module of loader 'app' ;...
在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)

  1. 此请求的背景:

我有第二个消息传递库(Kafka 除外),我想在 inbound/outbound 网关内部抽象出负载在传输到另一个进程期间如何转换,应用程序核心处理只有处理 ListMultimaps.

模板的消息转换器被绕过,因为网关直接创建 ProducerRecord

这里有一个解决方法:

    @Bean
    KafkaProducerMessageHandler<String, String> kpmh(StringJsonMessageConverter converter,
            ReplyingKafkaTemplate<String, String, String> repTemplate) {

        KafkaProducerMessageHandler<String, String> handler = Kafka.outboundGateway(repTemplate)
                        .replyMessageConverter(converter)
                        .topic("so68770130").get();

        @SuppressWarnings({ "rawtypes", "unchecked" })
        ProducerRecordCreator creator = (message, topic, partition, timestamp, key, value, headers) -> {
            ProducerRecord temp = converter.fromMessage(message, "so68770130");
            return new ProducerRecord(topic, partition, timestamp, key, temp.value(), headers);
        };
        handler.setProducerRecordCreator(creator);
        return handler;
    }

    @Bean
    IntegrationFlow flow(StringJsonMessageConverter converter,
            ReplyingKafkaTemplate<String, String, String> repTemplate,
            KafkaProducerMessageHandler<String, String> kpmh) {

        return f -> f.handle(kpmh)
                .log();
    }

我打开了一个问题:https://github.com/spring-projects/spring-integration/issues/3617