如何将 Spring Boot HealthIndicator 添加到 Imap 接收器 IntegrationFlow
How to add a Spring Boot HealthIndicator to an Imap receiver IntegrationFlow
如何将 Spring Boot HealthIndicator 与带有 imap 的 IntegrationFlow 轮询电子邮件集成?
我可以通过 errorChannel 从 IntegrationFlow 获取异常,但是一旦 IntegrationFlow 再次开始工作,例如在网络中断后,我如何“清除”异常。
@SpringBootApplication
public class MyApplication {
@Bean
IntegrationFlow mailFlow() {
return IntegrationFlows
.from(Mail.imapInboundAdapter(receiver()).get(),
e -> e.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.channel(mailChannel()).get();
}
@Bean
public ImapMailReceiver receiver() {
String mailServerPath = format("imaps://%s:%s@%s/INBOX", mailUser,
encode(mailPassword), mailServer);
ImapMailReceiver result = new ImapMailReceiver(mailServerPath);
return result;
}
@Bean
DirectChannel mailChannel() {
return new DirectChannel();
}
@Autowired
@Qualifier("errorChannel")
private PublishSubscribeChannel errorChannel;
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(errorChannel).handle(message -> {
MessagingException ex = (MessagingException) message.getPayload();
log.error("", ex);
}).get();
}
@Bean
HealthIndicator mailReceiverHealthIndicator() {
return () -> {
/*
* How to error check the imap polling ???
*/
return Health.up().build();
};
}
}
我会使用 AtomicReference<Exception>
bean 并在 errorHandlingFlow
中设置它的值。 HealthIndicator
impl 会在 AtomicReference
有值时向 down()
查询。
Mail.imapInboundAdapter()
的 PollerSpec
可以配置为 ReceiveMessageAdvice
:
/**
* Specify AOP {@link Advice}s for the {@code pollingTask}.
* @param advice the {@link Advice}s to use.
* @return the spec.
*/
public PollerSpec advice(Advice... advice) {
它的 afterReceive()
impl 可以清理 AtomicReference
,所以你的 HealthIndicator
会 return up()
.
关键是这个 afterReceive()
只有在 invocation.proceed()
没有异常失败时才会被调用。如果有新消息要处理,它会独立调用。
如何将 Spring Boot HealthIndicator 与带有 imap 的 IntegrationFlow 轮询电子邮件集成?
我可以通过 errorChannel 从 IntegrationFlow 获取异常,但是一旦 IntegrationFlow 再次开始工作,例如在网络中断后,我如何“清除”异常。
@SpringBootApplication
public class MyApplication {
@Bean
IntegrationFlow mailFlow() {
return IntegrationFlows
.from(Mail.imapInboundAdapter(receiver()).get(),
e -> e.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.channel(mailChannel()).get();
}
@Bean
public ImapMailReceiver receiver() {
String mailServerPath = format("imaps://%s:%s@%s/INBOX", mailUser,
encode(mailPassword), mailServer);
ImapMailReceiver result = new ImapMailReceiver(mailServerPath);
return result;
}
@Bean
DirectChannel mailChannel() {
return new DirectChannel();
}
@Autowired
@Qualifier("errorChannel")
private PublishSubscribeChannel errorChannel;
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(errorChannel).handle(message -> {
MessagingException ex = (MessagingException) message.getPayload();
log.error("", ex);
}).get();
}
@Bean
HealthIndicator mailReceiverHealthIndicator() {
return () -> {
/*
* How to error check the imap polling ???
*/
return Health.up().build();
};
}
}
我会使用 AtomicReference<Exception>
bean 并在 errorHandlingFlow
中设置它的值。 HealthIndicator
impl 会在 AtomicReference
有值时向 down()
查询。
Mail.imapInboundAdapter()
的 PollerSpec
可以配置为 ReceiveMessageAdvice
:
/**
* Specify AOP {@link Advice}s for the {@code pollingTask}.
* @param advice the {@link Advice}s to use.
* @return the spec.
*/
public PollerSpec advice(Advice... advice) {
它的 afterReceive()
impl 可以清理 AtomicReference
,所以你的 HealthIndicator
会 return up()
.
关键是这个 afterReceive()
只有在 invocation.proceed()
没有异常失败时才会被调用。如果有新消息要处理,它会独立调用。