当 ItemProcessor 抛出异常时停止任务
Stop task when there is an exception thrown in ItemProcessor
我正在设计一个 Spring 批处理,它读取多个 csv 文件。我已经使用分区来读取块中的每个文件并对其进行处理以解密 csv 中的特定列。在解密之前,如果我遇到任何验证错误,我会抛出自定义异常。
现在我想要的是,如果处理在第一行中发现任何验证错误,则不应处理其他行,并且作业应该结束。
我怎样才能做到这一点?我也尝试实现 ProcessorListener,但它没有 StepExecution 对象,因此我可以调用 SetTerminateOnly() 或 ExitStatus=Failed
另请注意,我有多个线程以不同的方式访问文件lines.I希望在第一次遇到错误时终止所有线程。
提前致谢
如果我们在处理器中抛出自定义异常,Spring 批处理将终止并标记作业失败,除非您设置 'skipable' 异常。您没有提到执行验证步骤的位置,您是在处理器中执行还是在 Reader 中执行?让我知道,因为这是 Spring 批次决定的地方。
在我的项目中,如果我想停止作业并抛出自定义异常,我们将验证逻辑放在 Tasklet 或处理器中并抛出异常,如下所示
private AccountInfoEntity getAccountInfo(Long partnerId) {
if(partnerId != null){
.....
return ....;
} else {
throw new ReportsException("XXXXX");
}
}
因此,我发现 运行 多个异步并发线程(Spring 批处理分区)才是真正的问题。虽然其中一个线程抛出异常,但其他线程并行运行,并执行到最后。
最后,作业整体失败,没有处理任何输出,但处理其余数据需要时间。
好吧,解决方法很简单。我们只需要在处理过程中遇到错误时停止 Job。
自定义处理器
public class MultiThreadedFlatFileItemProcessor implements ItemProcessor<BinFileVO, BinFileVO>,JobExecutionListener{
private JobExecution jobExecution;
private RSADecrypter decrypter;
public RSADecrypter getDecrypter() {
return decrypter;
}
public void setDecrypter(RSADecrypter decrypter) {
this.decrypter = decrypter;
}
@Override
/**
This method is used process the encrypted data
@param item
* */
public BinFileVO process(BinFileVO item) throws JobException {
if(null!=item.getEncryptedText() && !item.getEncryptedText().isEmpty()){
String decrypted = decrypter.getDecryptedText(item.getEncryptedText());
if(null!=decrypted && !decrypted.isEmpty()){
if(decrypted.matches("[0-9]+")){
if(decrypted.length() >= 12 && decrypted.length() <= 19){
item.setEncryptedText(decrypted);
}else{
this.jobExecution.stop();
throw new JobException(PropertyLoader.getValue(ApplicationConstants.DECRYPTED_CARD_NO_LENGTH_INVALID),item.getLineNumber());
}
}
}else{
this.jobExecution.stop();
throw new JobException(PropertyLoader.getValue(ApplicationConstants.EMPTY_ENCRYPTED_DATA),item.getLineNumber());
}
return item;
}
@Override
public void beforeJob(JobExecution jobExecution) {
this.jobExecution=jobExecution;
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
作业xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
.....>
<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- Job Details -->
<job id="simpleMultiThreadsReaderJob" xmlns="http://www.springframework.org/schema/batch">
<step id="step" >
<partition step="step1" partitioner="partitioner">
<handler grid-size="5" task-executor="taskExecutor"/>
</partition>
</step>
<listeners>
<listener ref="decryptingItemProcessor"/>
</listeners>
</job>
<step id="step1" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" processor="decryptingItemProcessor" commit-interval="500"/>
<listeners>
<listener ref="customItemProcessorListener" />
</listeners>
</tasklet>
</step>
<!-- Processor Details -->
<bean id="decryptingItemProcessor" class="com.test.batch.io.MultiThreadedFlatFileItemProcessor">
<property name="decrypter" ref="rsaDecrypter" />
</bean>
<!-- RSA Decrypter class -->
<bean id="rsaDecrypter" class="test.batch.secure.rsa.client.RSADecrypter"/>
<!-- Partitioner Details -->
<bean class="org.springframework.batch.core.scope.StepScope" />
<bean id="partitioner" class="com.test.batch.partition.FlatFilePartitioner" scope="step">
<property name="resource" ref="inputFile"/>
</bean>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10"/>
</bean>
<!-- Step will need a transaction manager -->
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
........
.................
</beans>
这是日志
2016-09-01 06:32:40 INFO SimpleJobRepository:273 - Parent JobExecution is stopped, so passing message on to StepExecution
2016-09-01 06:32:43 INFO ThreadStepInterruptionPolicy:60 - Step interrupted through StepExecution
2016-09-01 06:32:43 INFO AbstractStep:216 - Encountered interruption executing step: Job interrupted status detected.
; org.springframework.batch.core.JobInterruptedException
2016-09-01 06:32:45 ERROR CustomJobListener:163 - exception :At line No. 1 : The decrypted card number is less than 12 or greater than 19 in length
2016-09-01 06:32:45 ERROR CustomJobListener:163 - exception :Job interrupted status detected.
2016-09-01 06:32:45 INFO SimpleJobLauncher:135 - Job: [FlowJob: [name=simpleMultiThreadsReaderJob]] completed with the following parameters: [{outputFile=/usr/local/pos/bulktokenization/csv/outputs/cc_output_EDWError_08162016.csv, partitionFile=/usr/local/pos/bulktokenization/csv/partitions/, inputFile=C:\usr\local\pos\bulktokenization\csv\inputs\cc_input_EDWError_08162016.csv, fileName=cc_input_EDWError_08162016}] and the following status: [FAILED]
2016-09-01 06:32:45 INFO BatchLauncher:122 - Exit Status : FAILED
2016-09-01 06:32:45 INFO BatchLauncher:123 - Time Taken : 8969
我正在设计一个 Spring 批处理,它读取多个 csv 文件。我已经使用分区来读取块中的每个文件并对其进行处理以解密 csv 中的特定列。在解密之前,如果我遇到任何验证错误,我会抛出自定义异常。
现在我想要的是,如果处理在第一行中发现任何验证错误,则不应处理其他行,并且作业应该结束。 我怎样才能做到这一点?我也尝试实现 ProcessorListener,但它没有 StepExecution 对象,因此我可以调用 SetTerminateOnly() 或 ExitStatus=Failed
另请注意,我有多个线程以不同的方式访问文件lines.I希望在第一次遇到错误时终止所有线程。
提前致谢
如果我们在处理器中抛出自定义异常,Spring 批处理将终止并标记作业失败,除非您设置 'skipable' 异常。您没有提到执行验证步骤的位置,您是在处理器中执行还是在 Reader 中执行?让我知道,因为这是 Spring 批次决定的地方。
在我的项目中,如果我想停止作业并抛出自定义异常,我们将验证逻辑放在 Tasklet 或处理器中并抛出异常,如下所示
private AccountInfoEntity getAccountInfo(Long partnerId) {
if(partnerId != null){
.....
return ....;
} else {
throw new ReportsException("XXXXX");
}
}
因此,我发现 运行 多个异步并发线程(Spring 批处理分区)才是真正的问题。虽然其中一个线程抛出异常,但其他线程并行运行,并执行到最后。 最后,作业整体失败,没有处理任何输出,但处理其余数据需要时间。
好吧,解决方法很简单。我们只需要在处理过程中遇到错误时停止 Job。
自定义处理器
public class MultiThreadedFlatFileItemProcessor implements ItemProcessor<BinFileVO, BinFileVO>,JobExecutionListener{
private JobExecution jobExecution;
private RSADecrypter decrypter;
public RSADecrypter getDecrypter() {
return decrypter;
}
public void setDecrypter(RSADecrypter decrypter) {
this.decrypter = decrypter;
}
@Override
/**
This method is used process the encrypted data
@param item
* */
public BinFileVO process(BinFileVO item) throws JobException {
if(null!=item.getEncryptedText() && !item.getEncryptedText().isEmpty()){
String decrypted = decrypter.getDecryptedText(item.getEncryptedText());
if(null!=decrypted && !decrypted.isEmpty()){
if(decrypted.matches("[0-9]+")){
if(decrypted.length() >= 12 && decrypted.length() <= 19){
item.setEncryptedText(decrypted);
}else{
this.jobExecution.stop();
throw new JobException(PropertyLoader.getValue(ApplicationConstants.DECRYPTED_CARD_NO_LENGTH_INVALID),item.getLineNumber());
}
}
}else{
this.jobExecution.stop();
throw new JobException(PropertyLoader.getValue(ApplicationConstants.EMPTY_ENCRYPTED_DATA),item.getLineNumber());
}
return item;
}
@Override
public void beforeJob(JobExecution jobExecution) {
this.jobExecution=jobExecution;
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
作业xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
.....>
<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- Job Details -->
<job id="simpleMultiThreadsReaderJob" xmlns="http://www.springframework.org/schema/batch">
<step id="step" >
<partition step="step1" partitioner="partitioner">
<handler grid-size="5" task-executor="taskExecutor"/>
</partition>
</step>
<listeners>
<listener ref="decryptingItemProcessor"/>
</listeners>
</job>
<step id="step1" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" processor="decryptingItemProcessor" commit-interval="500"/>
<listeners>
<listener ref="customItemProcessorListener" />
</listeners>
</tasklet>
</step>
<!-- Processor Details -->
<bean id="decryptingItemProcessor" class="com.test.batch.io.MultiThreadedFlatFileItemProcessor">
<property name="decrypter" ref="rsaDecrypter" />
</bean>
<!-- RSA Decrypter class -->
<bean id="rsaDecrypter" class="test.batch.secure.rsa.client.RSADecrypter"/>
<!-- Partitioner Details -->
<bean class="org.springframework.batch.core.scope.StepScope" />
<bean id="partitioner" class="com.test.batch.partition.FlatFilePartitioner" scope="step">
<property name="resource" ref="inputFile"/>
</bean>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10"/>
</bean>
<!-- Step will need a transaction manager -->
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
........
.................
</beans>
这是日志
2016-09-01 06:32:40 INFO SimpleJobRepository:273 - Parent JobExecution is stopped, so passing message on to StepExecution
2016-09-01 06:32:43 INFO ThreadStepInterruptionPolicy:60 - Step interrupted through StepExecution
2016-09-01 06:32:43 INFO AbstractStep:216 - Encountered interruption executing step: Job interrupted status detected.
; org.springframework.batch.core.JobInterruptedException 2016-09-01 06:32:45 ERROR CustomJobListener:163 - exception :At line No. 1 : The decrypted card number is less than 12 or greater than 19 in length
2016-09-01 06:32:45 ERROR CustomJobListener:163 - exception :Job interrupted status detected.
2016-09-01 06:32:45 INFO SimpleJobLauncher:135 - Job: [FlowJob: [name=simpleMultiThreadsReaderJob]] completed with the following parameters: [{outputFile=/usr/local/pos/bulktokenization/csv/outputs/cc_output_EDWError_08162016.csv, partitionFile=/usr/local/pos/bulktokenization/csv/partitions/, inputFile=C:\usr\local\pos\bulktokenization\csv\inputs\cc_input_EDWError_08162016.csv, fileName=cc_input_EDWError_08162016}] and the following status: [FAILED]
2016-09-01 06:32:45 INFO BatchLauncher:122 - Exit Status : FAILED
2016-09-01 06:32:45 INFO BatchLauncher:123 - Time Taken : 8969