Spring 异常后batch一直写记录

Spring batch keeps writing record after exception

我有一个 spring 批处理作业,我必须检查所有文件行中的 id 是否相等,并且应该跳过包含不同 id 的行。我所做的是保存第一条记录然后比较每行的 id,如果 id 不同则抛出运行时异常,但由于某种原因 spring 批处理工作直到它得到“要排除”的行并且然后通过在 exception 上写入所有记录来重复写入过程。 这就是我的意思:

sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
2021-06-03 11:12:28.466 ERROR 41416 --- [   scheduling-1] tn.itserv.batch.SkipLinesListener        : An error occured while writing the input Force rollback on skippable exception so that skipped item can be located.
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=5598661]
sms [ Id_Campaign=7798661]

我的步骤:

    @Bean
    public Step loadFiles() throws IOException {
        return stepBuilderFactory
                .get("step1")
                .<FileModelIn, FileModelOut>chunk(100)
                .reader(multiResourceItemReader())
                .processor(batchProcessor())
                .writer(batchWriter())
                .faultTolerant()
                .skipPolicy(skipLinesListener())
                .noRetry(RuntimeException.class)
                .noRollback(RuntimeException.class)
                .listener(MyStepListner())
                .build();
    }

跳过策略:

public class SkipLinesListener implements SkipPolicy {
    private static final int MAX_SKIP_COUNT = 10;
    private static final Logger logger = LoggerFactory.getLogger(SkipLinesListener.class);

    @Override
    public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
        if (t instanceof  RuntimeException && skipCount < MAX_SKIP_COUNT )
        { RuntimeException ex=(RuntimeException)t;
            logger.error("An error occured while writing the input "+ ex.getMessage());
            return true;
        }
        if (t instanceof FlatFileParseException && skipCount < MAX_SKIP_COUNT ) {
            FlatFileParseException ex = (FlatFileParseException) t;
           
         logger.error("An error occured while processing the "+ ex.getInput());
                return true;     
        }
    
    return false;
    
    }

}

我不知道为什么会出现这种行为,我是不是漏掉了什么? 我在 itemwriter class

中抛出异常
@Override
public void write(List<? extends FileModelOut> items) throws Exception {
        List<FCCampaignModel> campaigns=new ArrayList<FCCampaignModel>();   
        List<sms> smsList=new ArrayList<>();
        FCCampaignModel firstLine=cmsDaoProxy.addCampaign(items.get(0).getFcCampaignModel());
        
        for (FileModelOut fileContent : items) {
         if (fileContent.getFcCampaignModel().getId_Campaign().equals(firstLine.getId_Campaign()))
            {
             
            smsRepository.save(fileContent.getSms());
            }
         else throw new RuntimeException("different id campaign detected : "+fileContent.getFcCampaignModel().getId_Campaign());
         
        }

你的异常被声明为一个可跳过的异常,所以当它从item writer抛出时,SpringBatch会逐个item扫描chunk,即一个一个重新处理item,每一个在自己的交易。

这是因为项目是按块写入的(即批量模式),如果在批量写入操作期间发生异常,Spring批处理无法知道是哪个项目导致了问题,因此它将重试他们一个一个。您可以在示例模块中找到示例:Chunk Scanning Sample.