当 ItemProcessor 抛出异常时停止任务

Stop task when there is an exception thrown in ItemProcessor

我正在设计一个 Spring 批处理,它读取多个 csv 文件。我已经使用分区来读取块中的每个文件并对其进行处理以解密 csv 中的特定列。在解密之前,如果我遇到任何验证错误,我会抛出自定义异常。

现在我想要的是,如果处理在第一行中发现任何验证错误,则不应处理其他行,并且作业应该结束。 我怎样才能做到这一点?我也尝试实现 ProcessorListener,但它没有 StepExecution 对象,因此我可以调用 SetTerminateOnly() 或 ExitStatus=Failed

另请注意,我有多个线程以不同的方式访问文件lines.I希望在第一次遇到错误时终止所有线程。

提前致谢

如果我们在处理器中抛出自定义异常,Spring 批处理将终止并标记作业失败,除非您设置 'skipable' 异常。您没有提到执行验证步骤的位置,您是在处理器中执行还是在 Reader 中执行?让我知道,因为这是 Spring 批次决定的地方。

在我的项目中,如果我想停止作业并抛出自定义异常,我们将验证逻辑放在 Tasklet 或处理器中并抛出异常,如下所示

private AccountInfoEntity getAccountInfo(Long partnerId) {
        if(partnerId != null){
            .....
            return ....;
        } else {
            throw new ReportsException("XXXXX");
        }
    }

因此,我发现 运行 多个异步并发线程(Spring 批处理分区)才是真正的问题。虽然其中一个线程抛出异常,但其他线程并行运行,并执行到最后。 最后,作业整体失败,没有处理任何输出,但处理其余数据需要时间。

好吧,解决方法很简单。我们只需要在处理过程中遇到错误时停止 Job。

自定义处理器

  public class MultiThreadedFlatFileItemProcessor implements ItemProcessor<BinFileVO, BinFileVO>,JobExecutionListener{
    private JobExecution jobExecution;
    private RSADecrypter decrypter;
    
    public RSADecrypter getDecrypter() {
        return decrypter;
    }
    
    public void setDecrypter(RSADecrypter decrypter) { 
        this.decrypter = decrypter;
    }
    @Override
    /**
     This method is used process the encrypted data
     @param item
     * */
    public BinFileVO process(BinFileVO item) throws JobException {
        if(null!=item.getEncryptedText() && !item.getEncryptedText().isEmpty()){
            String decrypted = decrypter.getDecryptedText(item.getEncryptedText());
            if(null!=decrypted && !decrypted.isEmpty()){
                 if(decrypted.matches("[0-9]+")){
                     if(decrypted.length() >= 12 && decrypted.length() <= 19){
                         item.setEncryptedText(decrypted);
                     }else{
                         this.jobExecution.stop();
                         throw new JobException(PropertyLoader.getValue(ApplicationConstants.DECRYPTED_CARD_NO_LENGTH_INVALID),item.getLineNumber());
                     }
                 }
         }else{
             this.jobExecution.stop();
             throw new JobException(PropertyLoader.getValue(ApplicationConstants.EMPTY_ENCRYPTED_DATA),item.getLineNumber());
         }
         return item;
     }
    @Override
    public void beforeJob(JobExecution jobExecution) {
            this.jobExecution=jobExecution;
    }
    @Override
    public void afterJob(JobExecution jobExecution) {
        
    }
  }

作业xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
   .....>
  <!-- JobRepository and JobLauncher are configuration/setup classes -->
  <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
  <bean id="jobLauncher"    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
     <property name="jobRepository" ref="jobRepository" />
  </bean>       

 <!-- Job Details -->
    <job id="simpleMultiThreadsReaderJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="step" >
            <partition step="step1" partitioner="partitioner">
                <handler grid-size="5" task-executor="taskExecutor"/>
            </partition>
        </step>
        <listeners>
            <listener ref="decryptingItemProcessor"/>
        </listeners>
    </job>
  
    <step id="step1" xmlns="http://www.springframework.org/schema/batch">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" processor="decryptingItemProcessor" commit-interval="500"/>
            <listeners>
                <listener ref="customItemProcessorListener" />
            </listeners>
        </tasklet>
    </step>
    <!-- Processor Details -->
    <bean id="decryptingItemProcessor" class="com.test.batch.io.MultiThreadedFlatFileItemProcessor">
        <property name="decrypter" ref="rsaDecrypter" />
    </bean>     
    <!-- RSA Decrypter class -->    
    <bean id="rsaDecrypter" class="test.batch.secure.rsa.client.RSADecrypter"/>
    
    <!-- Partitioner Details -->
    <bean class="org.springframework.batch.core.scope.StepScope" />
    <bean id="partitioner" class="com.test.batch.partition.FlatFilePartitioner" scope="step">
        <property name="resource" ref="inputFile"/>
    </bean>
     <bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10"/>
    </bean>
   <!-- Step will need a transaction manager -->
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    ........
    .................
</beans>

这是日志

2016-09-01 06:32:40 INFO SimpleJobRepository:273 - Parent JobExecution is stopped, so passing message on to StepExecution

2016-09-01 06:32:43 INFO ThreadStepInterruptionPolicy:60 - Step interrupted through StepExecution

2016-09-01 06:32:43 INFO AbstractStep:216 - Encountered interruption executing step: Job interrupted status detected.

; org.springframework.batch.core.JobInterruptedException 2016-09-01 06:32:45 ERROR CustomJobListener:163 - exception :At line No. 1 : The decrypted card number is less than 12 or greater than 19 in length

2016-09-01 06:32:45 ERROR CustomJobListener:163 - exception :Job interrupted status detected.

2016-09-01 06:32:45 INFO SimpleJobLauncher:135 - Job: [FlowJob: [name=simpleMultiThreadsReaderJob]] completed with the following parameters: [{outputFile=/usr/local/pos/bulktokenization/csv/outputs/cc_output_EDWError_08162016.csv, partitionFile=/usr/local/pos/bulktokenization/csv/partitions/, inputFile=C:\usr\local\pos\bulktokenization\csv\inputs\cc_input_EDWError_08162016.csv, fileName=cc_input_EDWError_08162016}] and the following status: [FAILED]

2016-09-01 06:32:45 INFO BatchLauncher:122 - Exit Status : FAILED

2016-09-01 06:32:45 INFO BatchLauncher:123 - Time Taken : 8969