在 Spring 集成中为 ContentEnricher 定义异常回退行为
Defining fallback behavior on exception for a ContentEnricher in Spring Integration
我已经通过 Spring 集成定义了一个很好的流程,在标称情况下,它按照我想要的方式工作。
但是我还没有找到一种方法来定义处理错误的行为(即将输入行标记为失败)
这是我得到的错误:
[ask-scheduler-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has exited due to an exception while sending the request message:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [outboundGateway]; nested exception is...
这是我的流配置(简化):
@Configuration
@EnableIntegration
public class IntegrationConfig {
@Autowired
private DataSource dataSource;
@Bean
public IntegrationFlow mainFlow() {
//noinspection unchecked
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(5000)
.transactional(transactionManager())))
.split()
.enrich(e -> e
.requestChannel(subChannel())
.requestPayload(Message::getPayload)
.propertyExpression("fooId", "payload.id"))
.handle(barHandler())
.get();
}
@Bean
public IntegrationFlow enricherFlow() {
return IntegrationFlows.from(subChannel())
.handle(outboundGateway())
.get();
}
@Bean
public MessageChannel subChannel() {
return new DirectChannel();
}
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource,
"select * from FOO");
}
@Bean
public JdbcOutboundGateway outboundGateway() {
...
}
@Bean
public JdbcMessageHandler barHandler() {
return new JdbcMessageHandler(dataSource,
"INSERT INTO BAR ...");
}
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}
我原以为我可以在默认的 errorChannel 上获取错误并在那里执行必要的任务,但我还没有找到完成这项工作的方法。
非常感谢任何想法和帮助!
你知道我们似乎错过了在 Java DSL 中为 EnricherSpec
公开 errorChannel
- 请随时就此事提出 JIRA issue。但无论如何,看起来我们可以访问那个 属性:
.enrich(e -> {
ContentEnricher contentEnricher =
e.requestChannel(subChannel())
.requestPayload(Message::getPayload)
.propertyExpression("fooId", "payload.id"))
.get().getT2();
contentEnricher.setErrorChannel(enricherErrorChannel());
})
现在您可以将任何 .handle()
流添加到 enricherErrorChannel()
并在钓鱼时处理 ErrorMessage
。例如,通常 ErrorMessage
包含一个 payload
作为 MessagingException
,而我们有一个 failedMessage
属性。正是那个包含了关于你原来的 payload
和 headers
的信息:
我已经通过 Spring 集成定义了一个很好的流程,在标称情况下,它按照我想要的方式工作。
但是我还没有找到一种方法来定义处理错误的行为(即将输入行标记为失败)
这是我得到的错误:
[ask-scheduler-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has exited due to an exception while sending the request message:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [outboundGateway]; nested exception is...
这是我的流配置(简化):
@Configuration
@EnableIntegration
public class IntegrationConfig {
@Autowired
private DataSource dataSource;
@Bean
public IntegrationFlow mainFlow() {
//noinspection unchecked
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(5000)
.transactional(transactionManager())))
.split()
.enrich(e -> e
.requestChannel(subChannel())
.requestPayload(Message::getPayload)
.propertyExpression("fooId", "payload.id"))
.handle(barHandler())
.get();
}
@Bean
public IntegrationFlow enricherFlow() {
return IntegrationFlows.from(subChannel())
.handle(outboundGateway())
.get();
}
@Bean
public MessageChannel subChannel() {
return new DirectChannel();
}
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource,
"select * from FOO");
}
@Bean
public JdbcOutboundGateway outboundGateway() {
...
}
@Bean
public JdbcMessageHandler barHandler() {
return new JdbcMessageHandler(dataSource,
"INSERT INTO BAR ...");
}
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}
我原以为我可以在默认的 errorChannel 上获取错误并在那里执行必要的任务,但我还没有找到完成这项工作的方法。
非常感谢任何想法和帮助!
你知道我们似乎错过了在 Java DSL 中为 EnricherSpec
公开 errorChannel
- 请随时就此事提出 JIRA issue。但无论如何,看起来我们可以访问那个 属性:
.enrich(e -> {
ContentEnricher contentEnricher =
e.requestChannel(subChannel())
.requestPayload(Message::getPayload)
.propertyExpression("fooId", "payload.id"))
.get().getT2();
contentEnricher.setErrorChannel(enricherErrorChannel());
})
现在您可以将任何 .handle()
流添加到 enricherErrorChannel()
并在钓鱼时处理 ErrorMessage
。例如,通常 ErrorMessage
包含一个 payload
作为 MessagingException
,而我们有一个 failedMessage
属性。正是那个包含了关于你原来的 payload
和 headers
的信息: