如何将 Spring Boot HealthIndicator 添加到 Imap 接收器 IntegrationFlow

How to add a Spring Boot HealthIndicator to an Imap receiver IntegrationFlow

如何将 Spring Boot HealthIndicator 与带有 imap 的 IntegrationFlow 轮询电子邮件集成?

我可以通过 errorChannel 从 IntegrationFlow 获取异常,但是一旦 IntegrationFlow 再次开始工作,例如在网络中断后,我如何“清除”异常。

@SpringBootApplication
public class MyApplication {

    @Bean
    IntegrationFlow mailFlow() {
        return IntegrationFlows
                .from(Mail.imapInboundAdapter(receiver()).get(),
                        e -> e.autoStartup(true)
                                .poller(Pollers.fixedRate(5000)))
                .channel(mailChannel()).get();
    }
    
    @Bean
    public ImapMailReceiver receiver() {
        String mailServerPath = format("imaps://%s:%s@%s/INBOX", mailUser,
                encode(mailPassword), mailServer);
        ImapMailReceiver result = new ImapMailReceiver(mailServerPath);
        return result;
    }

    @Bean
    DirectChannel mailChannel() {
        return new DirectChannel();
    }

    @Autowired
    @Qualifier("errorChannel")
    private PublishSubscribeChannel errorChannel;

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(errorChannel).handle(message -> {
            MessagingException ex = (MessagingException) message.getPayload();
            log.error("", ex);
        }).get();
    }

    @Bean
    HealthIndicator mailReceiverHealthIndicator() {
        return () -> {
            /*
             * How to error check the imap polling ???
             */
            return Health.up().build();
        };
    }

}

我会使用 AtomicReference<Exception> bean 并在 errorHandlingFlow 中设置它的值。 HealthIndicator impl 会在 AtomicReference 有值时向 down() 查询。

Mail.imapInboundAdapter()PollerSpec 可以配置为 ReceiveMessageAdvice:

/**
 * Specify AOP {@link Advice}s for the {@code pollingTask}.
 * @param advice the {@link Advice}s to use.
 * @return the spec.
 */
public PollerSpec advice(Advice... advice) {

它的 afterReceive() impl 可以清理 AtomicReference,所以你的 HealthIndicator 会 return up().

关键是这个 afterReceive() 只有在 invocation.proceed() 没有异常失败时才会被调用。如果有新消息要处理,它会独立调用。