在 Processor 和 Sink 之间转换消息
Converting messages between Processor and Sink
我正在尝试创建一个示例应用程序,该应用程序会派人进行处理,然后收集所有错误,并且我正在使用 Jackson 进行序列化。
但是我的接收器在尝试转换从处理器收到的消息时失败了。看起来它正在接收具有不同 contentType 而不是 application/json 的消息。我应该怎么做才能让它正常工作?
我正在使用 Spring Cloud Dalston.SR3
主要:
@SpringBootApplication
public class PersonStreamApplication {
@Bean
public MessageConverter customMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
return converter;
}
public static void main(String[] args) {
SpringApplication.run(PersonStreamApplication.class, args);
}
}
来源:
@EnableBinding(PersonSource.Source.class)
public class PersonSource {
@Bean
@InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Person> getDate() {
Map<String, Object> headers = new HashMap<>();
headers.put("directory", "/dir");
return () -> createMessage(new Person("joe@joe.com", 28), new MessageHeaders(headers));
}
public interface Source {
String SAMPLE = "sample-source";
@Output(SAMPLE)
MessageChannel sampleSource();
}
}
处理器:
@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {
private final PersonValidator validator;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Either<String, Person> process(Person person) {
return this.validator.validate(person).toEither();
}
}
接收器:
@EnableBinding(PersonSink.Sink.class)
public class PersonSink {
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
public interface Sink {
String SAMPLE = "sample-sink";
@Input(SAMPLE)
SubscribableChannel sampleSink();
}
}
application.yml
spring:
cloud:
stream:
bindings:
sample-source:
destination: testing.stream.input
input:
destination: testing.stream.input
output:
content-type: application/json
destination: testing.stream.output
sample-sink:
destination: testing.stream.output
您的问题在于:
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
您必须使用 @StreamListener
才能获得 Content-Type
转换:https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling
The distinction between @StreamListener
and a Spring Integration @ServiceActivator
is seen when considering an inbound Message that has a String payload and a contentType
header of application/json
. In the case of @StreamListener
, the MessageConverter
mechanism will use the contentType
header to parse the String
payload into a Vote
object.
我正在尝试创建一个示例应用程序,该应用程序会派人进行处理,然后收集所有错误,并且我正在使用 Jackson 进行序列化。
但是我的接收器在尝试转换从处理器收到的消息时失败了。看起来它正在接收具有不同 contentType 而不是 application/json 的消息。我应该怎么做才能让它正常工作?
我正在使用 Spring Cloud Dalston.SR3
主要:
@SpringBootApplication
public class PersonStreamApplication {
@Bean
public MessageConverter customMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
return converter;
}
public static void main(String[] args) {
SpringApplication.run(PersonStreamApplication.class, args);
}
}
来源:
@EnableBinding(PersonSource.Source.class)
public class PersonSource {
@Bean
@InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Person> getDate() {
Map<String, Object> headers = new HashMap<>();
headers.put("directory", "/dir");
return () -> createMessage(new Person("joe@joe.com", 28), new MessageHeaders(headers));
}
public interface Source {
String SAMPLE = "sample-source";
@Output(SAMPLE)
MessageChannel sampleSource();
}
}
处理器:
@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {
private final PersonValidator validator;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Either<String, Person> process(Person person) {
return this.validator.validate(person).toEither();
}
}
接收器:
@EnableBinding(PersonSink.Sink.class)
public class PersonSink {
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
public interface Sink {
String SAMPLE = "sample-sink";
@Input(SAMPLE)
SubscribableChannel sampleSink();
}
}
application.yml
spring:
cloud:
stream:
bindings:
sample-source:
destination: testing.stream.input
input:
destination: testing.stream.input
output:
content-type: application/json
destination: testing.stream.output
sample-sink:
destination: testing.stream.output
您的问题在于:
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
您必须使用 @StreamListener
才能获得 Content-Type
转换:https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling
The distinction between
@StreamListener
and a Spring Integration@ServiceActivator
is seen when considering an inbound Message that has a String payload and acontentType
header ofapplication/json
. In the case of@StreamListener
, theMessageConverter
mechanism will use thecontentType
header to parse theString
payload into aVote
object.