Spring 异常后batch一直写记录
Spring batch keeps writing record after exception
我有一个 spring 批处理作业,我必须检查所有文件行中的 id 是否相等,并且应该跳过包含不同 id 的行。我所做的是保存第一条记录然后比较每行的 id,如果 id 不同则抛出运行时异常,但由于某种原因 spring 批处理工作直到它得到“要排除”的行并且然后通过在 exception 上写入所有记录来重复写入过程。
这就是我的意思:
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
2021-06-03 11:12:28.466 ERROR 41416 --- [ scheduling-1] tn.itserv.batch.SkipLinesListener : An error occured while writing the input Force rollback on skippable exception so that skipped item can be located.
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=7798661]
我的步骤:
@Bean
public Step loadFiles() throws IOException {
return stepBuilderFactory
.get("step1")
.<FileModelIn, FileModelOut>chunk(100)
.reader(multiResourceItemReader())
.processor(batchProcessor())
.writer(batchWriter())
.faultTolerant()
.skipPolicy(skipLinesListener())
.noRetry(RuntimeException.class)
.noRollback(RuntimeException.class)
.listener(MyStepListner())
.build();
}
跳过策略:
public class SkipLinesListener implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 10;
private static final Logger logger = LoggerFactory.getLogger(SkipLinesListener.class);
@Override
public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
if (t instanceof RuntimeException && skipCount < MAX_SKIP_COUNT )
{ RuntimeException ex=(RuntimeException)t;
logger.error("An error occured while writing the input "+ ex.getMessage());
return true;
}
if (t instanceof FlatFileParseException && skipCount < MAX_SKIP_COUNT ) {
FlatFileParseException ex = (FlatFileParseException) t;
logger.error("An error occured while processing the "+ ex.getInput());
return true;
}
return false;
}
}
我不知道为什么会出现这种行为,我是不是漏掉了什么?
我在 itemwriter class
中抛出异常
@Override
public void write(List<? extends FileModelOut> items) throws Exception {
List<FCCampaignModel> campaigns=new ArrayList<FCCampaignModel>();
List<sms> smsList=new ArrayList<>();
FCCampaignModel firstLine=cmsDaoProxy.addCampaign(items.get(0).getFcCampaignModel());
for (FileModelOut fileContent : items) {
if (fileContent.getFcCampaignModel().getId_Campaign().equals(firstLine.getId_Campaign()))
{
smsRepository.save(fileContent.getSms());
}
else throw new RuntimeException("different id campaign detected : "+fileContent.getFcCampaignModel().getId_Campaign());
}
你的异常被声明为一个可跳过的异常,所以当它从item writer抛出时,SpringBatch会逐个item扫描chunk,即一个一个重新处理item,每一个在自己的交易。
这是因为项目是按块写入的(即批量模式),如果在批量写入操作期间发生异常,Spring批处理无法知道是哪个项目导致了问题,因此它将重试他们一个一个。您可以在示例模块中找到示例:Chunk Scanning Sample.
我有一个 spring 批处理作业,我必须检查所有文件行中的 id 是否相等,并且应该跳过包含不同 id 的行。我所做的是保存第一条记录然后比较每行的 id,如果 id 不同则抛出运行时异常,但由于某种原因 spring 批处理工作直到它得到“要排除”的行并且然后通过在 exception 上写入所有记录来重复写入过程。 这就是我的意思:
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
2021-06-03 11:12:28.466 ERROR 41416 --- [ scheduling-1] tn.itserv.batch.SkipLinesListener : An error occured while writing the input Force rollback on skippable exception so that skipped item can be located.
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=7798661]
我的步骤:
@Bean
public Step loadFiles() throws IOException {
return stepBuilderFactory
.get("step1")
.<FileModelIn, FileModelOut>chunk(100)
.reader(multiResourceItemReader())
.processor(batchProcessor())
.writer(batchWriter())
.faultTolerant()
.skipPolicy(skipLinesListener())
.noRetry(RuntimeException.class)
.noRollback(RuntimeException.class)
.listener(MyStepListner())
.build();
}
跳过策略:
public class SkipLinesListener implements SkipPolicy {
private static final int MAX_SKIP_COUNT = 10;
private static final Logger logger = LoggerFactory.getLogger(SkipLinesListener.class);
@Override
public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
if (t instanceof RuntimeException && skipCount < MAX_SKIP_COUNT )
{ RuntimeException ex=(RuntimeException)t;
logger.error("An error occured while writing the input "+ ex.getMessage());
return true;
}
if (t instanceof FlatFileParseException && skipCount < MAX_SKIP_COUNT ) {
FlatFileParseException ex = (FlatFileParseException) t;
logger.error("An error occured while processing the "+ ex.getInput());
return true;
}
return false;
}
}
我不知道为什么会出现这种行为,我是不是漏掉了什么? 我在 itemwriter class
中抛出异常@Override
public void write(List<? extends FileModelOut> items) throws Exception {
List<FCCampaignModel> campaigns=new ArrayList<FCCampaignModel>();
List<sms> smsList=new ArrayList<>();
FCCampaignModel firstLine=cmsDaoProxy.addCampaign(items.get(0).getFcCampaignModel());
for (FileModelOut fileContent : items) {
if (fileContent.getFcCampaignModel().getId_Campaign().equals(firstLine.getId_Campaign()))
{
smsRepository.save(fileContent.getSms());
}
else throw new RuntimeException("different id campaign detected : "+fileContent.getFcCampaignModel().getId_Campaign());
}
你的异常被声明为一个可跳过的异常,所以当它从item writer抛出时,SpringBatch会逐个item扫描chunk,即一个一个重新处理item,每一个在自己的交易。
这是因为项目是按块写入的(即批量模式),如果在批量写入操作期间发生异常,Spring批处理无法知道是哪个项目导致了问题,因此它将重试他们一个一个。您可以在示例模块中找到示例:Chunk Scanning Sample.