如何使用 Spring-Integration Kafka InboundGateway 和 OutboundGateway 与 MessageConverter?
how to use Spring-Integration Kafka InboundGateway and OutboundGateway with a MessageConverter?
- 我的目标:
进程#1:
a) 将 Guava ListMultimap 集合发送到 Kafka OutboundGateway,让网关即时将其转换为 Json 字符串。
b) 从进程 #2 接收答案作为 Json 字符串,并让 OutboundGateway 将其即时转换回 ListMultimap
进程#2:
a) 通过 Kafka InboundGateway 从进程 #1 接收 Json 字符串消息,让 InboundGateway 将其即时转换为 ListMultimap
b) 将 ListMultimap 应答发送回进程 #1,并让 InboundGateway 将其即时转换回 json 字符串。
- 我试过的(伪代码;抱歉,恐怕无法提供完整的源代码):
#process1
RecordMessageConverter converter = new StringJsonMessageConverter(
.JsonMapper.builder()
.addModule(new GuavaModule)
.build());
ReplyingKafkaTemplate<String,Object,Object> template =
new ReplyingKafkatemplate<String,Object,Object>(producerFactory,replyListenerContainer);
template.setMessageConverter(converter); //conversion for submission ?
IntegrationFlow flow =
IntegrationFlows.from(
() -> MessageBuilder.withPayload(someListMultimap).build(),
c -> c.poller(Pollers.cron(cronSchedule)))
.handle(
Kafka.outboundGateway(template)
.replyMessageConverter(converter) //conversion for reception ?
.topic(gatewayTopic)
.channel(outputChannel)
.get();
#process2
RecordMessageConverter converter = new StringJsonMessageConverter(
.JsonMapper.builder()
.addModule(new GuavaModule)
.build());
IntegrationFlows.from(
Kafka.inboundGateway(consumerFactory,containerProperties, producerFactory)
.messageConverter(converter) //conversion for reception ?
.configureTemplate(t -> t.messageConverter(converter))) //conversion for submission ?
.transform(ListMultimap p -> ListMultimap.of("newkey","newvalue2","newkey","newvalue2"))
.get()
- 我有什么问题:
当消息从进程 #1 发送时,我收到以下错误(抱歉,无法提供完整的调用堆栈):
Caused by: java.lang.ClassCastException: class com.google.common.collect.ImmutableListmap cannot be cast to class java.lang.String (com.google.collect.ImmutableListMultimap is in unnamed module of loader 'app' ;...
在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
- 此请求的背景:
我有第二个消息传递库(Kafka 除外),我想在 inbound/outbound 网关内部抽象出负载在传输到另一个进程期间如何转换,应用程序核心处理只有处理 ListMultimaps.
模板的消息转换器被绕过,因为网关直接创建 ProducerRecord
。
这里有一个解决方法:
@Bean
KafkaProducerMessageHandler<String, String> kpmh(StringJsonMessageConverter converter,
ReplyingKafkaTemplate<String, String, String> repTemplate) {
KafkaProducerMessageHandler<String, String> handler = Kafka.outboundGateway(repTemplate)
.replyMessageConverter(converter)
.topic("so68770130").get();
@SuppressWarnings({ "rawtypes", "unchecked" })
ProducerRecordCreator creator = (message, topic, partition, timestamp, key, value, headers) -> {
ProducerRecord temp = converter.fromMessage(message, "so68770130");
return new ProducerRecord(topic, partition, timestamp, key, temp.value(), headers);
};
handler.setProducerRecordCreator(creator);
return handler;
}
@Bean
IntegrationFlow flow(StringJsonMessageConverter converter,
ReplyingKafkaTemplate<String, String, String> repTemplate,
KafkaProducerMessageHandler<String, String> kpmh) {
return f -> f.handle(kpmh)
.log();
}
我打开了一个问题:https://github.com/spring-projects/spring-integration/issues/3617
- 我的目标:
进程#1:
a) 将 Guava ListMultimap 集合发送到 Kafka OutboundGateway,让网关即时将其转换为 Json 字符串。
b) 从进程 #2 接收答案作为 Json 字符串,并让 OutboundGateway 将其即时转换回 ListMultimap
进程#2:
a) 通过 Kafka InboundGateway 从进程 #1 接收 Json 字符串消息,让 InboundGateway 将其即时转换为 ListMultimap
b) 将 ListMultimap 应答发送回进程 #1,并让 InboundGateway 将其即时转换回 json 字符串。
- 我试过的(伪代码;抱歉,恐怕无法提供完整的源代码):
#process1
RecordMessageConverter converter = new StringJsonMessageConverter(
.JsonMapper.builder()
.addModule(new GuavaModule)
.build());
ReplyingKafkaTemplate<String,Object,Object> template =
new ReplyingKafkatemplate<String,Object,Object>(producerFactory,replyListenerContainer);
template.setMessageConverter(converter); //conversion for submission ?
IntegrationFlow flow =
IntegrationFlows.from(
() -> MessageBuilder.withPayload(someListMultimap).build(),
c -> c.poller(Pollers.cron(cronSchedule)))
.handle(
Kafka.outboundGateway(template)
.replyMessageConverter(converter) //conversion for reception ?
.topic(gatewayTopic)
.channel(outputChannel)
.get();
#process2
RecordMessageConverter converter = new StringJsonMessageConverter(
.JsonMapper.builder()
.addModule(new GuavaModule)
.build());
IntegrationFlows.from(
Kafka.inboundGateway(consumerFactory,containerProperties, producerFactory)
.messageConverter(converter) //conversion for reception ?
.configureTemplate(t -> t.messageConverter(converter))) //conversion for submission ?
.transform(ListMultimap p -> ListMultimap.of("newkey","newvalue2","newkey","newvalue2"))
.get()
- 我有什么问题:
当消息从进程 #1 发送时,我收到以下错误(抱歉,无法提供完整的调用堆栈):
Caused by: java.lang.ClassCastException: class com.google.common.collect.ImmutableListmap cannot be cast to class java.lang.String (com.google.collect.ImmutableListMultimap is in unnamed module of loader 'app' ;...
在 org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
- 此请求的背景:
我有第二个消息传递库(Kafka 除外),我想在 inbound/outbound 网关内部抽象出负载在传输到另一个进程期间如何转换,应用程序核心处理只有处理 ListMultimaps.
模板的消息转换器被绕过,因为网关直接创建 ProducerRecord
。
这里有一个解决方法:
@Bean
KafkaProducerMessageHandler<String, String> kpmh(StringJsonMessageConverter converter,
ReplyingKafkaTemplate<String, String, String> repTemplate) {
KafkaProducerMessageHandler<String, String> handler = Kafka.outboundGateway(repTemplate)
.replyMessageConverter(converter)
.topic("so68770130").get();
@SuppressWarnings({ "rawtypes", "unchecked" })
ProducerRecordCreator creator = (message, topic, partition, timestamp, key, value, headers) -> {
ProducerRecord temp = converter.fromMessage(message, "so68770130");
return new ProducerRecord(topic, partition, timestamp, key, temp.value(), headers);
};
handler.setProducerRecordCreator(creator);
return handler;
}
@Bean
IntegrationFlow flow(StringJsonMessageConverter converter,
ReplyingKafkaTemplate<String, String, String> repTemplate,
KafkaProducerMessageHandler<String, String> kpmh) {
return f -> f.handle(kpmh)
.log();
}
我打开了一个问题:https://github.com/spring-projects/spring-integration/issues/3617