为什么 Spring Batch AsycItemProcessor 中的异常被 SkipListener 的 onSkipInWrite 方法捕获?
Why is exception in Spring Batch AsycItemProcessor caught by SkipListener's onSkipInWrite method?
我正在编写一个 Spring 启动应用程序,它启动、收集数百万个数据库条目并将其转换为新的简化 JSON 格式,然后将它们全部发送到 GCP PubSub 主题。我正在尝试为此使用 Spring 批处理,但我 运行 遇到了为我的流程实现容错的麻烦。数据库充斥着数据质量问题,有时我转换为 JSON 会失败。当失败发生时,我不希望工作立即退出,我希望它继续处理尽可能多的记录,并在完成之前报告失败的确切记录,以便我和/或我的团队可以检查这些有问题的数据库条目。
为此,我尝试使用 Spring Batch 的 SkipListener 接口。但我还在我的进程中使用了 AsyncItemProcessor 和 AsyncItemWriter,即使在处理过程中发生了异常,SkipListener 的 onSkipInWrite()
方法正在捕获它们 - 而不是 onSkipInProcess()
方法。不幸的是,onSkipInWrite()
方法无法访问原始数据库实体,因此我无法将其 ID 存储在有问题的数据库条目列表中。
我是不是配置有误?是否有任何其他方法可以访问未通过 AsynItemProcessor 处理步骤的 reader 中的对象?
这是我试过的...
我有一个单例 Spring 组件,我在其中存储我已成功处理的数据库条目数以及最多 20 个有问题的数据库条目。
@Component
@Getter //lombok
public class ProcessStatus {
private int processed;
private int failureCount;
private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();
public void incrementProgress { processed++; }
public void logUnexpectedFailure(UnexpectedFailure failure) {
failureCount++;
unexpectedFailure.add(failure);
}
@Getter
@AllArgsConstructor
public static class UnexpectedFailure {
private Throwable error;
private DBProjection dbData;
}
}
我有一个 Spring 批处理 Skip Listener,它可以捕捉失败并相应地更新我的状态组件:
@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
private ProcessStatus processStatus;
@Override
public void onSkipInRead(Throwable error) {}
@Override
public void onSkipInProcess(DBProjection dbData, Throwable error) {
processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
}
@Override
public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
//This is getting called instead!! Even though the exception happened during processing :(
//But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
}
}
然后我这样配置我的工作:
@Configuration
public class ConversionBatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TaskExecutor processThreadPool;
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
// myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
//Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
asyncDataConverter.setDelegate(dataConverter);
asyncDataConverter.setTaskExecutor(processThreadPool);
asyncDataConverter.afterPropertiesSet();
return asyncDataConverter;
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
@StepScope
public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
asyncJsonPublisher.setDelegate(jsonPublisher);
asyncJsonPublisher.afterPropertiesSet();
return asyncJsonPublisher;
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
AsyncItemWriter<JsonMessage> asyncJsonPublisher,
ProcessStatus processStatus,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
.reader(dbReader)
.processor(asyncDataConverter)
.writer(asyncJsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new ConversionSkipListener(processStatus))
.build();
}
@Bean
public Job conversionJob(Step conversionProcess) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
这是因为被AsyncItemProcessor
包裹的future只在AsyncItemWriter
解包裹,所以此时可能发生的任何异常都被视为写异常而不是处理异常。这就是为什么调用 onSkipInWrite
而不是 onSkipInProcess
.
的原因
这实际上是此模式的一个已知限制,记录在 AsyncItemProcessor 的 Javadoc 中,这里是摘录:
Because the Future is typically unwrapped in the ItemWriter,
there are lifecycle and stats limitations (since the framework doesn't know
what the result of the processor is).
While not an exhaustive list, things like StepExecution.filterCount will not
reflect the number of filtered items and
itemProcessListener.onProcessError(Object, Exception) will not be called.
Javadoc 指出该列表并不详尽,您遇到的有关 SkipListener
的副作用就是这些限制之一。
我正在编写一个 Spring 启动应用程序,它启动、收集数百万个数据库条目并将其转换为新的简化 JSON 格式,然后将它们全部发送到 GCP PubSub 主题。我正在尝试为此使用 Spring 批处理,但我 运行 遇到了为我的流程实现容错的麻烦。数据库充斥着数据质量问题,有时我转换为 JSON 会失败。当失败发生时,我不希望工作立即退出,我希望它继续处理尽可能多的记录,并在完成之前报告失败的确切记录,以便我和/或我的团队可以检查这些有问题的数据库条目。
为此,我尝试使用 Spring Batch 的 SkipListener 接口。但我还在我的进程中使用了 AsyncItemProcessor 和 AsyncItemWriter,即使在处理过程中发生了异常,SkipListener 的 onSkipInWrite()
方法正在捕获它们 - 而不是 onSkipInProcess()
方法。不幸的是,onSkipInWrite()
方法无法访问原始数据库实体,因此我无法将其 ID 存储在有问题的数据库条目列表中。
我是不是配置有误?是否有任何其他方法可以访问未通过 AsynItemProcessor 处理步骤的 reader 中的对象?
这是我试过的...
我有一个单例 Spring 组件,我在其中存储我已成功处理的数据库条目数以及最多 20 个有问题的数据库条目。
@Component
@Getter //lombok
public class ProcessStatus {
private int processed;
private int failureCount;
private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();
public void incrementProgress { processed++; }
public void logUnexpectedFailure(UnexpectedFailure failure) {
failureCount++;
unexpectedFailure.add(failure);
}
@Getter
@AllArgsConstructor
public static class UnexpectedFailure {
private Throwable error;
private DBProjection dbData;
}
}
我有一个 Spring 批处理 Skip Listener,它可以捕捉失败并相应地更新我的状态组件:
@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
private ProcessStatus processStatus;
@Override
public void onSkipInRead(Throwable error) {}
@Override
public void onSkipInProcess(DBProjection dbData, Throwable error) {
processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
}
@Override
public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
//This is getting called instead!! Even though the exception happened during processing :(
//But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
}
}
然后我这样配置我的工作:
@Configuration
public class ConversionBatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TaskExecutor processThreadPool;
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
// myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
//Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
asyncDataConverter.setDelegate(dataConverter);
asyncDataConverter.setTaskExecutor(processThreadPool);
asyncDataConverter.afterPropertiesSet();
return asyncDataConverter;
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
@StepScope
public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
asyncJsonPublisher.setDelegate(jsonPublisher);
asyncJsonPublisher.afterPropertiesSet();
return asyncJsonPublisher;
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
AsyncItemWriter<JsonMessage> asyncJsonPublisher,
ProcessStatus processStatus,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
.reader(dbReader)
.processor(asyncDataConverter)
.writer(asyncJsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new ConversionSkipListener(processStatus))
.build();
}
@Bean
public Job conversionJob(Step conversionProcess) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
这是因为被AsyncItemProcessor
包裹的future只在AsyncItemWriter
解包裹,所以此时可能发生的任何异常都被视为写异常而不是处理异常。这就是为什么调用 onSkipInWrite
而不是 onSkipInProcess
.
这实际上是此模式的一个已知限制,记录在 AsyncItemProcessor 的 Javadoc 中,这里是摘录:
Because the Future is typically unwrapped in the ItemWriter,
there are lifecycle and stats limitations (since the framework doesn't know
what the result of the processor is).
While not an exhaustive list, things like StepExecution.filterCount will not
reflect the number of filtered items and
itemProcessListener.onProcessError(Object, Exception) will not be called.
Javadoc 指出该列表并不详尽,您遇到的有关 SkipListener
的副作用就是这些限制之一。