避免异常到达之前的 IntegrationFlows
Avoid exceptions reaching prior IntegrationFlows
我定义了两个 IntegrationFlows,它们都使用这个组件。一个从 ftp 读取,一个从磁盘读取文件。
@Bean
public IntegrationFlow csvLineFlowDefinition() {
return IntegrationFlows.from(CHANNEL_NAME)
.filter(String.class, m -> {
// filter to remove column definition csv line
return !m.startsWith("ID");
})
.<String, MyPrettyObject>transform(csvLinePayload -> {
String[] array = csvLinePayload.split(",");
MyPrettyObject myPrettyObject = new MyPrettyObject();
myPrettyObject.setId(array[0]);
myPrettyObject.setType(array[1]);
return myPrettyObject;
})
.<MyPrettyObject, String>route(myPrettyObject -> myPrettyObject.getType(),
routeResult -> routeResult
.channelMapping("AA", "AA_CHANNEL")
.channelMapping("BB", "BB_CHANNEL")
.channelMapping("CC", "CC_CHANNEL"))
.get();
}
我希望这两个 IntegrationFlow 仅在从 ftp 读取或从磁盘读取文件出现问题时才失败。
他们有自己定义的错误通道。
我不希望在将 csv 行转换为 MyPrettyObject 以到达这两个 IntegrationFlows 时出错。
我考虑过将原始 csv 行分派到消息队列,然后我可以在该消息队列的入站消费者上定义特定的错误通道。
然而这似乎有点矫枉过正。
我尝试为转换器插入一个 ExpressionEvaluatingRequestHandlerAdvice,但我不确定如何正确使用它,并且消息没有到达路由器或 ERROR_CHANNEL_NAME
@Bean
public ExpressionEvaluatingRequestHandlerAdvice csvLineTransformerAdvice() {
ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
expressionEvaluatingRequestHandlerAdvice.setFailureChannelName(ERROR_CHANNEL_NAME);
expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
return expressionEvaluatingRequestHandlerAdvice;
}
.<String, MyPrettyObject>transform(csvLinePayload -> {
String[] array = csvLinePayload.split(",");
MyPrettyObject myPrettyObject = new MyPrettyObject();
myPrettyObject.setId(array[0]);
myPrettyObject.setType(array[1]);
return myPrettyObject;
}, t -> t.advice(csvLineTransformerAdvice()))
恐怕 "something wrong with reading" 没有到达错误通道,因为还没有要处理的消息。因此,将入站通道适配器与流的其余部分隔离可能不是一个好主意。对于将任何下游错误传播到入站通道适配器上的错误通道来说,这是很正常的。
ExpressionEvaluatingRequestHandlerAdvice
是正确的方法,但您应该记住它仅适用于 transformer
。下游流程尚未涉及该建议。
如果出现错误,流程将停止,并且由于错误确实无法到达下一个端点。不知道你有什么顾虑...
我定义了两个 IntegrationFlows,它们都使用这个组件。一个从 ftp 读取,一个从磁盘读取文件。
@Bean
public IntegrationFlow csvLineFlowDefinition() {
return IntegrationFlows.from(CHANNEL_NAME)
.filter(String.class, m -> {
// filter to remove column definition csv line
return !m.startsWith("ID");
})
.<String, MyPrettyObject>transform(csvLinePayload -> {
String[] array = csvLinePayload.split(",");
MyPrettyObject myPrettyObject = new MyPrettyObject();
myPrettyObject.setId(array[0]);
myPrettyObject.setType(array[1]);
return myPrettyObject;
})
.<MyPrettyObject, String>route(myPrettyObject -> myPrettyObject.getType(),
routeResult -> routeResult
.channelMapping("AA", "AA_CHANNEL")
.channelMapping("BB", "BB_CHANNEL")
.channelMapping("CC", "CC_CHANNEL"))
.get();
}
我希望这两个 IntegrationFlow 仅在从 ftp 读取或从磁盘读取文件出现问题时才失败。 他们有自己定义的错误通道。 我不希望在将 csv 行转换为 MyPrettyObject 以到达这两个 IntegrationFlows 时出错。
我考虑过将原始 csv 行分派到消息队列,然后我可以在该消息队列的入站消费者上定义特定的错误通道。
然而这似乎有点矫枉过正。
我尝试为转换器插入一个 ExpressionEvaluatingRequestHandlerAdvice,但我不确定如何正确使用它,并且消息没有到达路由器或 ERROR_CHANNEL_NAME
@Bean
public ExpressionEvaluatingRequestHandlerAdvice csvLineTransformerAdvice() {
ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
expressionEvaluatingRequestHandlerAdvice.setFailureChannelName(ERROR_CHANNEL_NAME);
expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
return expressionEvaluatingRequestHandlerAdvice;
}
.<String, MyPrettyObject>transform(csvLinePayload -> {
String[] array = csvLinePayload.split(",");
MyPrettyObject myPrettyObject = new MyPrettyObject();
myPrettyObject.setId(array[0]);
myPrettyObject.setType(array[1]);
return myPrettyObject;
}, t -> t.advice(csvLineTransformerAdvice()))
恐怕 "something wrong with reading" 没有到达错误通道,因为还没有要处理的消息。因此,将入站通道适配器与流的其余部分隔离可能不是一个好主意。对于将任何下游错误传播到入站通道适配器上的错误通道来说,这是很正常的。
ExpressionEvaluatingRequestHandlerAdvice
是正确的方法,但您应该记住它仅适用于 transformer
。下游流程尚未涉及该建议。
如果出现错误,流程将停止,并且由于错误确实无法到达下一个端点。不知道你有什么顾虑...