any/all 服务激活器完成后的输出和错误通道

Output and Error Channel after any/all service activator is done

我想在服务 any/all 服务激活器完成执行后执行一些操作。流程定义如下:

@Bean
public IntegrationFlow myFlow(MessageSource<Object> someMessageSource) {
    return IntegrationFlows.from(someMessageSource,
            c -> c.poller(Pollers.fixedRate(1, SECONDS)
                    .maxMessagesPerPoll(10)))
            .channel(mySourceChannel())
            .get();
}

@Bean
public MessageChannel mySourceChannel() {
    return new ExecutorChannel(executor());
}

两个服务激活器定义为

public static class MyFooActivator {  
    @ServiceActivator(inputChannel = "mySourceChannel")
    public void doSomething(FooTask event) {
      //handle foo task
    }
}

public static class MyBarActivator { 
    @ServiceActivator(inputChannel = "mySourceChannel")
    public void doSomething(BarTask event) {
        // handle bar task
    }
}

如您所见,服务激活器未定义为集成流的一部分,而是通过注释定义的。目标是在 1) 服务激活器执行成功 2) 服务激活器执行不成功时执行一些代码。

如果服务执行器known/defined有流,这可以通过定义另一个流并添加输出通道和错误来实现。不确定,如何在我的情况下实现相同的目标。

首先,您需要了解必须订阅者 mySourceChannel,当第一条消息发送到第一个服务激活器时,您最终会在它们之间进行循环平衡下一个到第二个。以此类推。 所以,你应该确定这正是你从这样的配置中排除的。

要对服务激活器的执行结果进行一些分析,您需要使用 ExpressionEvaluatingRequestHandlerAdvice 。如果需要,它有 successChannelfailureChannel 选项以及适当的表达式。

您可以将对其 bean 的引用注入到 @ServiceActivatoradviceChain 属性中。像这样:

    @Bean
    @ServiceActivator(inputChannel = "myHandlerChannel", adviceChain = "myHandlerAdvice")
    public MessageHandler myHandler() {
        return message -> { };
    }

    @Bean
    public Advice myHandlerAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannel(myHandlerSuccessChannel());
        return advice;
    }

参考手册中有更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#expression-advice