拆分器在异常期间中止,不处理后续消息

Splitter aborts during exception with out processing subsequent messages

我有一个需求,就是拆分消息,一条一条处理。如果任何消息失败,我想将其报告到错误通道并继续处理下一个可用消息

我正在使用 spring 带 1.0.0-SNAPSHOT 的云 aws 流启动器

我用splitter写了一个示例程序

     @Bean
      public MessageChannel channelSplitOne() {
        return new DirectChannel();
      }


      @StreamListener(INTERNAL_CHANNEL)
      public void channelOne(String message) {
        if (message.equals("l")) {
          throw new RuntimeException("Error due to l");
        }
        System.out.println("Internal: " + message);
      }


      @Splitter(inputChannel = Sink.INPUT, outputChannel = INTERNAL_CHANNEL)
      public List<Message> extractItems(Message<String> input) {
        return Arrays.stream(input.getPayload().split(""))
            .map(s -> MessageBuilder.withPayload(s).copyHeaders(input.getHeaders()).build())
            .collect(Collectors.toList());
      }

当我以 Hello 的形式发送消息时,期望的是 'h','e','o'应该处理,但是'l'应该报错。

但是这里'l'之后处理没有恢复

有什么办法可以实现吗

你可以这样做,但是使用 @ServiceActivator 而不是 @StreamListener。第一个有 adviceChain 选项,你可以在其中注入 ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice.

拆分器就像 Java 中的常规循环一样的问题,因此要在错误后继续,我们需要以某种方式添加一个 try...catch 那里。但这已经不是一个分散的责任。所以我们必须把这样的逻辑移到我们有错误问题的地方。