Spring 批处理 - 在 reader 处理器和编写器之间传递所有数据
Spring Batch - Pass all data between reader processor and writer
我很好奇如何设法将 reader 中的所有可用数据通过管道向下传递。
例如我希望 reader 提取所有数据并将整个结果集传递给处理器和编写器。结果集很小,我不担心资源。我认为我已经通过让所有组件(reader、编写器、处理器)接收和 return 已处理项目的集合来正确实现它。
虽然过程的结果看起来不错,但我看到的是作业正在读取所有内容,通过管道向下传递,然后 returns 到 reader, 阅读所有内容并传递下去等等。
我考虑过创建一个额外的步骤来读取所有数据并将其传递给后续步骤,但我很好奇我是否可以这样做以及如何做到这一点
这份工作看起来像
@Bean
Job job() throws Exception {
return jobs.get("job").start(step1()).build()
}
@Bean
protected Step step1() throws Exception {
return steps.get("step1").chunk(10)
.reader(reader()
.processor(processor()
.writer(writer()).build()
//....
reader,处理器和编写器接受并return一个列表,例如
class DomainItemProcessor implements ItemProcessor<List<Domain>, List<Domain>>{
此案例的高级设计将是
- Reader 将是自定义 reader。它将 return 列表或包含域对象列表的包装器。 reader 将注入一个 DAO bean 来执行查询并检索域列表。
public class 域列表 {
私有列表域;
// get/set
}
public class 域Reader 实现项目Reader {
@Autowire
private DomainDAO domainDAO;
private List<Domain> domains;
@Override
public DomainList read() throws Exception {
if (this.domains == null) {
// TODO: please replace with your business logic.
this.domains = this.domainDAO.getListofDomains();
return this.domains;
}
else {
return null; // to tell Spring Batch the reader has done.
}
}
}
- Processor 和 Writer 将以 DomainList 作为输入。
注:以上为伪代码
谢谢,
义
您也可以将其实现为 tasklet。由于您想一次处理所有数据,因此您实际上并没有批处理,因此,根本不会使用 "normal" spring 批处理步骤的整个重启和故障处理。
像这样的 tasklet 在伪代码中可能如下所示:
@Component
public class MyTasklet implements Tasklet {
@Autowired
private ItemReader<YourType> readerSpringBeanName;
@Autowired
private ItemProcessor<List<YourType>,List<YourType>> processorSpringBeanName;
@Autwired
private ItemWriter<List<YourType>> writerSpringBeanName;
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
readerSpringBeanName.open(new ExecutionContext());
writerSpringBeanName.open(new ExecutionContext());
List<YourType> items = new ArrayList<>();
YourType readItem = readerSpringBeanName.read();
while(readItem != null) {
items.add(readItem);
readItem = readerSpringBeanName.read();
}
writerSpringBeanName.write(processorSpringBeanName.process(items));
readerSpringBeanName.close();
writerSpringBeanName.close();
return RepeatStatus.FINISHED;
}
}
此外,根据您的用例,可能根本不需要定义 spring-批处理作业。
好的,这可能有点太晚了。但这是我对实施的看法
是的,您可以使用 itemreader、itemprocessor 和 itemwriter 来完成。可能有点矫枉过正,但还是可以做到的
主要问题(因为作业一直返回到 reader)我看到应该有一种方法可以告诉 spring 所有项目都已从 Itemreader 并且没有更多的对象可以读取。为此,当 spring 尝试读取更多对象时,您明确 return 一个空值。
所以这是一个示例 returning 来自 ItemReader 的列表
这里的 read() 方法应该有类似的实现
省略 Redis 实现,但这是它的要点,我声明了一个名为 -
的变量
iterateindex
像这样在项目 reader 的开头创建并初始化 iterateIndex
我还包含了 redisson 缓存来存储列表。同样可以否定
public class XXXConfigItemReader implements
ItemStreamReader<FeedbackConfigResponseModel> {
private int iterateIndex;
@Autowired
Environment env;
@Autowired
RestTemplateBuilder templateBuilder;
public DeferralConfigItemReader() {
this.iterateIndex = 0;
}
并确保 read() return 在达到列表大小时为 null
public List<FeedbackConfigResponseModel> read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// TODO Auto-generated method stub
// Get the config details from db
List<XXX> feedbackConfigModelList = new ArrayList<>;
// store all the values from the db or read from a file , read
//it line by line and marshall that to a list
// now on the first itemreader call, the iterateindex will not be
// equal to the list size and hence the entire list is returned
// in the first call
if (feedbackConfigModelList == null || this.iterateIndex == feedbackConfigModelList.size()) {
return null;
} else {
// and now we equate the list size and store it in iterateIndex
// the second call will return null.
this.iterateIndex = feedbackConfigModelList.size();
return feedbackConfigModelList;
}
}
希望对遇到同样问题的人有所帮助。
编辑:
显示如何使用 restTemplateBuilder。请注意,您可以不使用 RestTemplateBuilder,而是自动装配 RestTemplate。我利用 restTemplateBuilder 为我的 prj 需求提供了一些额外的配置
现在这是使用 itemstreamreader 接口
实现的完整项目reader
public class XXXX implements ItemStreamReader<FeedbackConfigResponseModel> {
private int iterateIndex;
@Autowired
Environment env;
@Autowired
RestTemplateBuilder templateBuilder;
@Autowired
RedissonClient redisClient;
public DeferralConfigItemReader() {
this.iterateIndex = -1;
this.feedbackConfigModelList = new ArrayList<>();
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public FeedbackConfigResponseModel read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// TODO Auto-generated method stub
String feedbackConfigFetchUrl = null;
ResponseEntity<FeedbackConfigResponseListModel> respModelEntity = null;
// if the cache is empty then fetch it from resttemplate
RList<FeedbackConfigResponseModel> rList = redisClient.getList(AppConstants.CACHE_DBCONFIG_LIST);
List<FeedbackConfigResponseModel> feedbackConfigModelList = new ArrayList<>();
FeedbackConfigResponseModel firstDbItem = rList.get(0);
if (firstDbItem == null) {
feedbackConfigFetchUrl = this.env.getProperty("restTemplate.default.url") + "/test";
respModelEntity = templateBuilder.build().getForEntity(feedbackConfigFetchUrl,
FeedbackConfigResponseListModel.class);
System.out.println("Response Model from template:" + respModelEntity.getBody());
feedbackConfigModelList = respModelEntity.getBody() == null ? null
: respModelEntity.getBody().getFeedbackResponseList();
rList.addAll(feedbackConfigModelList);
} else {
System.out.println("coming inside else");
feedbackConfigModelList = rList;
}
if (feedbackConfigModelList == null || this.iterateIndex == feedbackConfigModelList.size()) {
return null;
} else {
this.iterateIndex++;
System.out.println("itenrating index"+iterateIndex + feedbackConfigModelList.size());
return feedbackConfigModelList.get(iterateIndex);
}
}
}
我很好奇如何设法将 reader 中的所有可用数据通过管道向下传递。
例如我希望 reader 提取所有数据并将整个结果集传递给处理器和编写器。结果集很小,我不担心资源。我认为我已经通过让所有组件(reader、编写器、处理器)接收和 return 已处理项目的集合来正确实现它。
虽然过程的结果看起来不错,但我看到的是作业正在读取所有内容,通过管道向下传递,然后 returns 到 reader, 阅读所有内容并传递下去等等。
我考虑过创建一个额外的步骤来读取所有数据并将其传递给后续步骤,但我很好奇我是否可以这样做以及如何做到这一点
这份工作看起来像
@Bean
Job job() throws Exception {
return jobs.get("job").start(step1()).build()
}
@Bean
protected Step step1() throws Exception {
return steps.get("step1").chunk(10)
.reader(reader()
.processor(processor()
.writer(writer()).build()
//....
reader,处理器和编写器接受并return一个列表,例如
class DomainItemProcessor implements ItemProcessor<List<Domain>, List<Domain>>{
此案例的高级设计将是
- Reader 将是自定义 reader。它将 return 列表或包含域对象列表的包装器。 reader 将注入一个 DAO bean 来执行查询并检索域列表。
public class 域列表 { 私有列表域;
// get/set
}
public class 域Reader 实现项目Reader {
@Autowire
private DomainDAO domainDAO;
private List<Domain> domains;
@Override
public DomainList read() throws Exception {
if (this.domains == null) {
// TODO: please replace with your business logic.
this.domains = this.domainDAO.getListofDomains();
return this.domains;
}
else {
return null; // to tell Spring Batch the reader has done.
}
}
}
- Processor 和 Writer 将以 DomainList 作为输入。
注:以上为伪代码
谢谢, 义
您也可以将其实现为 tasklet。由于您想一次处理所有数据,因此您实际上并没有批处理,因此,根本不会使用 "normal" spring 批处理步骤的整个重启和故障处理。
像这样的 tasklet 在伪代码中可能如下所示:
@Component
public class MyTasklet implements Tasklet {
@Autowired
private ItemReader<YourType> readerSpringBeanName;
@Autowired
private ItemProcessor<List<YourType>,List<YourType>> processorSpringBeanName;
@Autwired
private ItemWriter<List<YourType>> writerSpringBeanName;
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
readerSpringBeanName.open(new ExecutionContext());
writerSpringBeanName.open(new ExecutionContext());
List<YourType> items = new ArrayList<>();
YourType readItem = readerSpringBeanName.read();
while(readItem != null) {
items.add(readItem);
readItem = readerSpringBeanName.read();
}
writerSpringBeanName.write(processorSpringBeanName.process(items));
readerSpringBeanName.close();
writerSpringBeanName.close();
return RepeatStatus.FINISHED;
}
}
此外,根据您的用例,可能根本不需要定义 spring-批处理作业。
好的,这可能有点太晚了。但这是我对实施的看法 是的,您可以使用 itemreader、itemprocessor 和 itemwriter 来完成。可能有点矫枉过正,但还是可以做到的
主要问题(因为作业一直返回到 reader)我看到应该有一种方法可以告诉 spring 所有项目都已从 Itemreader 并且没有更多的对象可以读取。为此,当 spring 尝试读取更多对象时,您明确 return 一个空值。
所以这是一个示例 returning 来自 ItemReader 的列表 这里的 read() 方法应该有类似的实现
省略 Redis 实现,但这是它的要点,我声明了一个名为 -
的变量iterateindex
像这样在项目 reader 的开头创建并初始化 iterateIndex 我还包含了 redisson 缓存来存储列表。同样可以否定
public class XXXConfigItemReader implements
ItemStreamReader<FeedbackConfigResponseModel> {
private int iterateIndex;
@Autowired
Environment env;
@Autowired
RestTemplateBuilder templateBuilder;
public DeferralConfigItemReader() {
this.iterateIndex = 0;
}
并确保 read() return 在达到列表大小时为 null
public List<FeedbackConfigResponseModel> read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// TODO Auto-generated method stub
// Get the config details from db
List<XXX> feedbackConfigModelList = new ArrayList<>;
// store all the values from the db or read from a file , read
//it line by line and marshall that to a list
// now on the first itemreader call, the iterateindex will not be
// equal to the list size and hence the entire list is returned
// in the first call
if (feedbackConfigModelList == null || this.iterateIndex == feedbackConfigModelList.size()) {
return null;
} else {
// and now we equate the list size and store it in iterateIndex
// the second call will return null.
this.iterateIndex = feedbackConfigModelList.size();
return feedbackConfigModelList;
}
}
希望对遇到同样问题的人有所帮助。
编辑: 显示如何使用 restTemplateBuilder。请注意,您可以不使用 RestTemplateBuilder,而是自动装配 RestTemplate。我利用 restTemplateBuilder 为我的 prj 需求提供了一些额外的配置
现在这是使用 itemstreamreader 接口
实现的完整项目reader public class XXXX implements ItemStreamReader<FeedbackConfigResponseModel> {
private int iterateIndex;
@Autowired
Environment env;
@Autowired
RestTemplateBuilder templateBuilder;
@Autowired
RedissonClient redisClient;
public DeferralConfigItemReader() {
this.iterateIndex = -1;
this.feedbackConfigModelList = new ArrayList<>();
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public FeedbackConfigResponseModel read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// TODO Auto-generated method stub
String feedbackConfigFetchUrl = null;
ResponseEntity<FeedbackConfigResponseListModel> respModelEntity = null;
// if the cache is empty then fetch it from resttemplate
RList<FeedbackConfigResponseModel> rList = redisClient.getList(AppConstants.CACHE_DBCONFIG_LIST);
List<FeedbackConfigResponseModel> feedbackConfigModelList = new ArrayList<>();
FeedbackConfigResponseModel firstDbItem = rList.get(0);
if (firstDbItem == null) {
feedbackConfigFetchUrl = this.env.getProperty("restTemplate.default.url") + "/test";
respModelEntity = templateBuilder.build().getForEntity(feedbackConfigFetchUrl,
FeedbackConfigResponseListModel.class);
System.out.println("Response Model from template:" + respModelEntity.getBody());
feedbackConfigModelList = respModelEntity.getBody() == null ? null
: respModelEntity.getBody().getFeedbackResponseList();
rList.addAll(feedbackConfigModelList);
} else {
System.out.println("coming inside else");
feedbackConfigModelList = rList;
}
if (feedbackConfigModelList == null || this.iterateIndex == feedbackConfigModelList.size()) {
return null;
} else {
this.iterateIndex++;
System.out.println("itenrating index"+iterateIndex + feedbackConfigModelList.size());
return feedbackConfigModelList.get(iterateIndex);
}
}
}