如何从从属步骤处理 reader 的 FlatFileParseException
How to handle reader's FlatFileParseException from slave step
我有 spring 批处理作业应用程序,它解析许多包含用户详细信息的 csv 文件。为了解析用户详细信息,我有一个从 csv 文件解析的 LineMapper。
所以第一个主步骤 reader 从位置读取所有文件,然后使用 Partitioner 我有从属步骤。所有从属步骤并行执行。这些从属步骤逐行解析每个文件以获取用户详细信息。处理后,它将处理过的文件移动到已处理文件夹。
在测试期间,我在文件中保留了一些错误的值,该文件导致从属步骤的 reader 导致 FlatFileParseException。在这里,我想将此文件移动到另一个文件夹,如 Failed 文件夹。但是我做不到。
如果任何从属步骤 reader 无法解析,我应该如何将文件写入失败文件夹?
我正在使用 Spring 引导,Spring 基于批处理注释
错误:
FlatFileParseException:资源中第 10 行的解析错误
JobExecutionException:分区处理程序返回不成功的步骤
代码:
BatchConfiguration.java
@Bean(name = "partitionerJob")
public Job partitionerJob()
throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob")
.start(partitionStep())
.build();
}
@Bean
public Step partitionStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner
= new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver
.getResources("file:src/main/resources/input/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving"
+ " the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
@Value("#{stepExecutionContext[fileName]}") String filename)
throws UnexpectedInputException, ParseException {
return new UserDetailReader(fileName);
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller
marshaller ,
@Value("#{stepExecutionContext[opFileName]}") String
filename )
throws MalformedURLException {
return new UserDetailWriter(fileName);
}
@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<User, User>chunk(5)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.build();
}
CustomMultiResourcePartitioner.java
public class CustomMultiResourcePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: "
+ resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName",
"output"+k+++".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
}
感谢帮助
您可以在定义步骤时使用 skip 和 skipLimit 属性。
以下是示例配置。
现在,在达到 skipLimit 之前,您的从属 Step 不会失败。
@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<User, User>chunk(5)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.faultTolerant()
.skip(Exception.class)
.skipLimit(500)
.build();
}
将文件移动到失败文件夹
您可以为您的 slaveStep 定义 StepExecutionListener。
在后续步骤方法中,您可以获得该步骤的 All FailureExceptions
。
使用它,您可以放置移动文件的逻辑
示例代码
public class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
LOGGER.info("MyStepListener beforeStep "+ stepExecution.getSummary());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
// move file from one folder to another
return null;
}
}
我有 spring 批处理作业应用程序,它解析许多包含用户详细信息的 csv 文件。为了解析用户详细信息,我有一个从 csv 文件解析的 LineMapper。
所以第一个主步骤 reader 从位置读取所有文件,然后使用 Partitioner 我有从属步骤。所有从属步骤并行执行。这些从属步骤逐行解析每个文件以获取用户详细信息。处理后,它将处理过的文件移动到已处理文件夹。
在测试期间,我在文件中保留了一些错误的值,该文件导致从属步骤的 reader 导致 FlatFileParseException。在这里,我想将此文件移动到另一个文件夹,如 Failed 文件夹。但是我做不到。
如果任何从属步骤 reader 无法解析,我应该如何将文件写入失败文件夹?
我正在使用 Spring 引导,Spring 基于批处理注释
错误:
FlatFileParseException:资源中第 10 行的解析错误
JobExecutionException:分区处理程序返回不成功的步骤
代码:
BatchConfiguration.java
@Bean(name = "partitionerJob")
public Job partitionerJob()
throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob")
.start(partitionStep())
.build();
}
@Bean
public Step partitionStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner
= new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver
.getResources("file:src/main/resources/input/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving"
+ " the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
@Value("#{stepExecutionContext[fileName]}") String filename)
throws UnexpectedInputException, ParseException {
return new UserDetailReader(fileName);
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller
marshaller ,
@Value("#{stepExecutionContext[opFileName]}") String
filename )
throws MalformedURLException {
return new UserDetailWriter(fileName);
}
@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<User, User>chunk(5)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.build();
}
CustomMultiResourcePartitioner.java
public class CustomMultiResourcePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: "
+ resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName",
"output"+k+++".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
}
感谢帮助
您可以在定义步骤时使用 skip 和 skipLimit 属性。
以下是示例配置。 现在,在达到 skipLimit 之前,您的从属 Step 不会失败。
@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<User, User>chunk(5)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.faultTolerant()
.skip(Exception.class)
.skipLimit(500)
.build();
}
将文件移动到失败文件夹
您可以为您的 slaveStep 定义 StepExecutionListener。
在后续步骤方法中,您可以获得该步骤的 All FailureExceptions
。
使用它,您可以放置移动文件的逻辑
示例代码
public class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
LOGGER.info("MyStepListener beforeStep "+ stepExecution.getSummary());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
// move file from one folder to another
return null;
}
}