Spring 集成 - 自定义 errorChannel - 仅记录第一个异常
Spring Integration - custom errorChannel - only first exception gets logged
这是对上一个问题的跟进(原始问题中给出的要求)。
Spring Integration - Filter - Send messages to a different end point
我的问题 是,如果输入文件中有多个错误,则只会记录第一个错误。随后的错误未被记录。
修改后的代码:
@Configuration
public class CreateUserConfiguration {
@Bean
public IntegrationFlow createUser() {
return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)))
.enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
.transform(csvToUserBeanTransformer, "convertCsvToUserBean")
.split(userBeanSplitter, "splitUserBeans")
.wireTap(flow -> flow.<UserBean>filter(userBean -> !userBean.getStatus().equalsIgnoreCase("SUCCESS")).channel("errorSummaryReportGenerationChannel"))
.transform(userBeanToJSONTransformer, "convertUserBeanToJSON")
.handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIRECTORY)))
.get();
}
@Bean
public IntegrationFlow logErrorSummary() {
return IntegrationFlows.from("errorSummaryReportGenerationChannel")
.handle((p,h) -> {
return ((UserBean)(p)).getUserID() + "\t" + ((UserBean)(p)).getStatus();
})
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(OUTPUT_FAILED_REPORT_FILE_NAME)))
.get();
}
@Bean
public IntegrationFlow logError() {
return IntegrationFlows.from("exceptionChannel")
.enrichHeaders(h -> h.headerExpression("errorFileName", "payload.failedMessage.headers.fileName"))
.wireTap(flow -> flow.handle(msg -> System.out.println("Received on exceptionChannel " + msg.getHeaders().get("errorFileName"))))
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(generateOutputDirectory(OUTPUT_FAILED_DIRECTORY))).autoCreateDirectory(true).fileExistsMode(FileExistsMode.APPEND).fileNameExpression("getHeaders().get(\"errorFileName\")+'.json'"))
.get();
}
@Bean(name = "exceptionChannel")
MessageChannel exceptionChannel() {
return MessageChannels.executor(new SimpleAsyncTaskExecutor()).get();
}
@Bean(name="errorSummaryReportGenerationChannel")
MessageChannel errorSummaryReportGenerationChannel() {
return DirectChannel();
}
}
我期待什么:
在错误摘要报告中 -
B123 ERROR, FREQUENCY
C123 FREQUENCY_DETAIL
在OUTPUT_FAILED_DIRECTORY-
B123.json -> stacktrace of error
C123.json -> stacktrace of error
我看到了什么:(缺少 C123 信息)
在错误摘要报告中 -
B123 ERROR, FREQUENCY
在OUTPUT_FAILED_DIRECTORY-
B123.json -> stacktrace of error
问题从 .split(userBeanSplitter, "splitUserBeans")
弹出。
我会说它与我们在 Java 中使用 for
循环所做的完全相似。
所以,如果循环中的方法抛出异常,你确实离开了循环,下一个项目将不再被处理。
要解决您的问题,您需要添加一个 .channel(c -> c.executor(myExecutor()))
来并行处理拆分的项目,并在单独的线程中进行错误处理。这样分离器中的循环就不会受到影响。
这是对上一个问题的跟进(原始问题中给出的要求)。
Spring Integration - Filter - Send messages to a different end point
我的问题 是,如果输入文件中有多个错误,则只会记录第一个错误。随后的错误未被记录。
修改后的代码:
@Configuration
public class CreateUserConfiguration {
@Bean
public IntegrationFlow createUser() {
return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)))
.enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
.transform(csvToUserBeanTransformer, "convertCsvToUserBean")
.split(userBeanSplitter, "splitUserBeans")
.wireTap(flow -> flow.<UserBean>filter(userBean -> !userBean.getStatus().equalsIgnoreCase("SUCCESS")).channel("errorSummaryReportGenerationChannel"))
.transform(userBeanToJSONTransformer, "convertUserBeanToJSON")
.handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIRECTORY)))
.get();
}
@Bean
public IntegrationFlow logErrorSummary() {
return IntegrationFlows.from("errorSummaryReportGenerationChannel")
.handle((p,h) -> {
return ((UserBean)(p)).getUserID() + "\t" + ((UserBean)(p)).getStatus();
})
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(OUTPUT_FAILED_REPORT_FILE_NAME)))
.get();
}
@Bean
public IntegrationFlow logError() {
return IntegrationFlows.from("exceptionChannel")
.enrichHeaders(h -> h.headerExpression("errorFileName", "payload.failedMessage.headers.fileName"))
.wireTap(flow -> flow.handle(msg -> System.out.println("Received on exceptionChannel " + msg.getHeaders().get("errorFileName"))))
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(generateOutputDirectory(OUTPUT_FAILED_DIRECTORY))).autoCreateDirectory(true).fileExistsMode(FileExistsMode.APPEND).fileNameExpression("getHeaders().get(\"errorFileName\")+'.json'"))
.get();
}
@Bean(name = "exceptionChannel")
MessageChannel exceptionChannel() {
return MessageChannels.executor(new SimpleAsyncTaskExecutor()).get();
}
@Bean(name="errorSummaryReportGenerationChannel")
MessageChannel errorSummaryReportGenerationChannel() {
return DirectChannel();
}
}
我期待什么:
在错误摘要报告中 -
B123 ERROR, FREQUENCY
C123 FREQUENCY_DETAIL
在OUTPUT_FAILED_DIRECTORY-
B123.json -> stacktrace of error
C123.json -> stacktrace of error
我看到了什么:(缺少 C123 信息)
在错误摘要报告中 -
B123 ERROR, FREQUENCY
在OUTPUT_FAILED_DIRECTORY-
B123.json -> stacktrace of error
问题从 .split(userBeanSplitter, "splitUserBeans")
弹出。
我会说它与我们在 Java 中使用 for
循环所做的完全相似。
所以,如果循环中的方法抛出异常,你确实离开了循环,下一个项目将不再被处理。
要解决您的问题,您需要添加一个 .channel(c -> c.executor(myExecutor()))
来并行处理拆分的项目,并在单独的线程中进行错误处理。这样分离器中的循环就不会受到影响。