spring 中的 `ItemReader` 是否可以批处理直到数据可用于类似阻塞队列的处理的时间点?
Can `ItemReader` in spring batch wait until the point in time when the data can be available for processing similar like Blocking Queues?
目前我正在按照以下策略处理 step
中的项目。
TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
.<Object, Object>chunk(configuration.getChunkSize())
.reader(reader)
.processor(processor)
.writer(writer).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);
这个 Task Step
还做了一些额外的工作,比如压缩文件并将其复制到不同的位置,因此需要很长时间才能完成。我怎样才能将这个额外的工作卸载到后台线程。
如果后台线程一直轮询直到新文件到达进行压缩,那么它可能会消耗更多 CPU cycles
而如果我们可以让该线程等待并在新文件到达时通知它,那么它将变得更加复杂。
我怎样才能开始一个新的 TaskStep
与我上面现有的 TaskStep
并行,这样新的 TaskStep
的 ItemReader
等到时间点文件到达以进行处理,如 blocking queues
?
如果您将处理器定义为 AsyncItemProcessor,则可以将 "expensive" 工作委托给后台线程。您可以使用线程池和委托处理器为其分配任务执行器,这将在后台线程中完成实际工作。
项目reader 将接受其他文件并将它们分配给任务执行器中的线程。当后台线程完成文件处理后,它将被分配回 writer。
AsyncProcessor asyncProcessor = new AsyncProcessor();
asyncProcessor.setDelegate(processor);
asyncProcessor.setTaskExecutor(taskExecutor);
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(writer);
TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
.<Object, Object>chunk(configuration.getChunkSize())
.reader(reader)
.processor(asyncProcessor)
.writer(asyncWriter).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);
目前我正在按照以下策略处理 step
中的项目。
TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
.<Object, Object>chunk(configuration.getChunkSize())
.reader(reader)
.processor(processor)
.writer(writer).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);
这个 Task Step
还做了一些额外的工作,比如压缩文件并将其复制到不同的位置,因此需要很长时间才能完成。我怎样才能将这个额外的工作卸载到后台线程。
如果后台线程一直轮询直到新文件到达进行压缩,那么它可能会消耗更多 CPU cycles
而如果我们可以让该线程等待并在新文件到达时通知它,那么它将变得更加复杂。
我怎样才能开始一个新的 TaskStep
与我上面现有的 TaskStep
并行,这样新的 TaskStep
的 ItemReader
等到时间点文件到达以进行处理,如 blocking queues
?
如果您将处理器定义为 AsyncItemProcessor,则可以将 "expensive" 工作委托给后台线程。您可以使用线程池和委托处理器为其分配任务执行器,这将在后台线程中完成实际工作。
项目reader 将接受其他文件并将它们分配给任务执行器中的线程。当后台线程完成文件处理后,它将被分配回 writer。
AsyncProcessor asyncProcessor = new AsyncProcessor();
asyncProcessor.setDelegate(processor);
asyncProcessor.setTaskExecutor(taskExecutor);
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(writer);
TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
.<Object, Object>chunk(configuration.getChunkSize())
.reader(reader)
.processor(asyncProcessor)
.writer(asyncWriter).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);