在 Processor 和 Sink 之间转换消息

Converting messages between Processor and Sink

我正在尝试创建一个示例应用程序,该应用程序会派人进行处理,然后收集所有错误,并且我正在使用 Jackson 进行序列化。

但是我的接收器在尝试转换从处理器收到的消息时失败了。看起来它正在接收具有不同 contentType 而不是 application/json 的消息。我应该怎么做才能让它正常工作?

我正在使用 Spring Cloud Dalston.SR3

主要:

@SpringBootApplication
public class PersonStreamApplication {

    @Bean
    public MessageConverter customMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
        return converter;
    }

    public static void main(String[] args) {
        SpringApplication.run(PersonStreamApplication.class, args);
    }
}

来源:

@EnableBinding(PersonSource.Source.class)
public class PersonSource {

    @Bean
    @InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
    public MessageSource<Person> getDate() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("directory", "/dir");
        return () -> createMessage(new Person("joe@joe.com", 28), new MessageHeaders(headers));
    }

    public interface Source {

        String SAMPLE = "sample-source";

        @Output(SAMPLE)
        MessageChannel sampleSource();
    }
}

处理器:

@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {

    private final PersonValidator validator;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Either<String, Person> process(Person person) {
        return this.validator.validate(person).toEither();
    }
}

接收器:

@EnableBinding(PersonSink.Sink.class)
public class PersonSink {

    @Bean
    public IntegrationFlow log() {
        return IntegrationFlows.from(Sink.SAMPLE).log().get();
    }

    public interface Sink {

        String SAMPLE = "sample-sink";

        @Input(SAMPLE)
        SubscribableChannel sampleSink();
    }
}

application.yml

spring:
  cloud:
    stream:
      bindings:
        sample-source:
          destination: testing.stream.input
        input:
          destination: testing.stream.input
        output:
          content-type: application/json
          destination: testing.stream.output
        sample-sink:
          destination: testing.stream.output

您的问题在于:

@Bean
public IntegrationFlow log() {
    return IntegrationFlows.from(Sink.SAMPLE).log().get();
}

您必须使用 @StreamListener 才能获得 Content-Type 转换:https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling

The distinction between @StreamListener and a Spring Integration @ServiceActivator is seen when considering an inbound Message that has a String payload and a contentType header of application/json. In the case of @StreamListener, the MessageConverter mechanism will use the contentType header to parse the String payload into a Vote object.