Spring 集成 - 自定义 errorChannel - 仅记录第一个异常

Spring Integration - custom errorChannel - only first exception gets logged

这是对上一个问题的跟进(原始问题中给出的要求)。

Spring Integration - Filter - Send messages to a different end point

我的问题 是,如果输入文件中有多个错误,则只会记录第一个错误。随后的错误未被记录。

修改后的代码:

@Configuration
public class CreateUserConfiguration {
    @Bean
    public IntegrationFlow createUser() {
        return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)))
                .enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
                .transform(csvToUserBeanTransformer, "convertCsvToUserBean")
                .split(userBeanSplitter, "splitUserBeans")
                .wireTap(flow -> flow.<UserBean>filter(userBean -> !userBean.getStatus().equalsIgnoreCase("SUCCESS")).channel("errorSummaryReportGenerationChannel"))
                .transform(userBeanToJSONTransformer, "convertUserBeanToJSON")
                .handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIRECTORY)))
                .get();
    }

    @Bean
    public IntegrationFlow logErrorSummary() {
        return IntegrationFlows.from("errorSummaryReportGenerationChannel")
                .handle((p,h) -> {
                    return ((UserBean)(p)).getUserID() + "\t" + ((UserBean)(p)).getStatus();
                })
                .transform(Transformers.objectToString())
                .handle(Files.outboundAdapter(new File(OUTPUT_FAILED_REPORT_FILE_NAME)))
                .get();
    }

    @Bean
    public IntegrationFlow logError() {
        return IntegrationFlows.from("exceptionChannel")
                .enrichHeaders(h -> h.headerExpression("errorFileName", "payload.failedMessage.headers.fileName"))
                .wireTap(flow -> flow.handle(msg -> System.out.println("Received on exceptionChannel " + msg.getHeaders().get("errorFileName"))))
                .transform(Transformers.objectToString())
                .handle(Files.outboundAdapter(new File(generateOutputDirectory(OUTPUT_FAILED_DIRECTORY))).autoCreateDirectory(true).fileExistsMode(FileExistsMode.APPEND).fileNameExpression("getHeaders().get(\"errorFileName\")+'.json'"))
                .get();
    }

    @Bean(name = "exceptionChannel")
    MessageChannel exceptionChannel() {
        return MessageChannels.executor(new SimpleAsyncTaskExecutor()).get();
    }

    @Bean(name="errorSummaryReportGenerationChannel")
    MessageChannel errorSummaryReportGenerationChannel() {
        return DirectChannel();
    }
}

我期待什么:

在错误摘要报告中 -

B123  ERROR, FREQUENCY
C123  FREQUENCY_DETAIL

在OUTPUT_FAILED_DIRECTORY-

B123.json -> stacktrace of error
C123.json -> stacktrace of error

我看到了什么:(缺少 C123 信息)

在错误摘要报告中 -

B123  ERROR, FREQUENCY

在OUTPUT_FAILED_DIRECTORY-

B123.json -> stacktrace of error

问题从 .split(userBeanSplitter, "splitUserBeans") 弹出。

我会说它与我们在 Java 中使用 for 循环所做的完全相似。 所以,如果循环中的方法抛出异常,你确实离开了循环,下一个项目将不再被处理。

要解决您的问题,您需要添加一个 .channel(c -> c.executor(myExecutor())) 来并行处理拆分的项目,并在单独的线程中进行错误处理。这样分离器中的循环就不会受到影响。