Spring 批处理:在 reader 和 writer 之间传递数据
Spring Batch: pass data between reader and writer
我想在 Writer 中获取我在步骤 Reader 中设置的数据。我通过 http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html#passingDataToFutureSteps
了解 ExecutionContexts(步骤和作业)和 ExecutionContextPromotionListener
问题是,在 Writer 中,我正在检索 'npag' 的空值。
ItemWriter 上的行:
LOG.info("INSIDE WRITE, NPAG: " + nPag);
我一直在做一些没有运气的解决方法,寻找其他 similar questions 的答案...有帮助吗?谢谢!
这是我的代码:
READER
@Component
public class LCItemReader implements ItemReader<String> {
private StepExecution stepExecution;
private int nPag = 1;
@Override
public String read() throws CustomItemReaderException {
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("npag", nPag);
nPag++;
return "content";
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
作者
@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {
private String nPag;
@Override
public void write(List<? extends String> continguts) throws Exception {
try {
LOG.info("INSIDE WRITE, NPAG: " + nPag);
} catch (Throwable ex) {
LOG.error("Error: " + ex.getMessage());
}
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.nPag = jobContext.get("npag").toString();
}
}
JOB/STEP 批量配置
@Bean
public Job lCJob() {
return jobs.get("lCJob")
.listener(jobListener)
.start(lCStep())
.build();
}
@Bean
public Step lCStep() {
return steps.get("lCStep")
.<String, String>chunk(1)
.reader(lCItemReader)
.processor(lCProcessor)
.writer(lCItemWriter)
.listener(promotionListener())
.build();
}
聆听者
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"npag"});
return executionContextPromotionListener;
}
ExecutionContextPromotionListener 明确声明它在步骤结束时工作,以便在编写器执行之后。所以我认为你指望的促销活动并没有在你认为它发生的时候发生。
如果我是你,我会在步骤上下文中设置它,如果你需要一个步骤中的值,我会从步骤中获取它。否则我会将其设置为工作上下文。
另一方面是@BeforeStep。这标志着在步骤上下文存在之前执行的方法。您在 reader 中设置 nPag 值的方式是在步骤开始执行之后。
您正在尝试读取 nPag 的值,甚至在 reader 中设置 nPag 之前,它的默认值为空。您需要在直接从执行上下文记录日志时读取 nPag 上的值。您可以保留对 jobContext 的引用。试试这个
@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {
private String nPag;
private ExecutionContext jobContext;
@Override
public void write(List<? extends String> continguts) throws Exception {
try {
this.nPag = jobContext.get("npag").toString();
LOG.info("INSIDE WRITE, NPAG: " + nPag);
} catch (Throwable ex) {
LOG.error("Error: " + ex.getMessage());
}
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
jobContext = jobExecution.getExecutionContext();
}
}
在您的 Reader 和 Writer 中,您需要实现 ItemStream 接口并使用 ExecutionContext 作为成员 variable.Here 我已经给出了 Processor 而不是 Writer 的示例,但同样适用于 Writer。它的工作对我来说很好,我能够从 reader 获取值到处理器。
我已在 reader 的上下文中设置值并在处理器中获取值。
public class EmployeeItemReader implements ItemReader<Employee>, ItemStream {
ExecutionContext context;
@Override
public Employee read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
context.put("ajay", "i am going well");
Employee emp=new Employee();
emp.setEmpId(1);
emp.setFirstName("ajay");
emp.setLastName("goswami");
return emp;
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void open(ExecutionContext arg0) throws ItemStreamException {
context = arg0;
}
@Override
public void update(ExecutionContext arg0) throws ItemStreamException {
// TODO Auto-generated method stub
context = arg0;
}
}
我的处理器
public class CustomItemProcessor implements ItemProcessor<Employee,ActiveEmployee>,ItemStream{
ExecutionContext context;
@Override
public ActiveEmployee process(Employee emp) throws Exception {
//See this line
System.out.println(context.get("ajay"));
ActiveEmployee actEmp=new ActiveEmployee();
actEmp.setEmpId(emp.getEmpId());
actEmp.setFirstName(emp.getFirstName());
actEmp.setLastName(emp.getLastName());
actEmp.setAdditionalInfo("Employee is processed");
return actEmp;
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void open(ExecutionContext arg0) throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void update(ExecutionContext arg0) throws ItemStreamException {
context = arg0;
}
}
希望对您有所帮助。
我想在 Writer 中获取我在步骤 Reader 中设置的数据。我通过 http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html#passingDataToFutureSteps
了解 ExecutionContexts(步骤和作业)和 ExecutionContextPromotionListener问题是,在 Writer 中,我正在检索 'npag' 的空值。
ItemWriter 上的行:
LOG.info("INSIDE WRITE, NPAG: " + nPag);
我一直在做一些没有运气的解决方法,寻找其他 similar questions 的答案...有帮助吗?谢谢!
这是我的代码:
READER
@Component
public class LCItemReader implements ItemReader<String> {
private StepExecution stepExecution;
private int nPag = 1;
@Override
public String read() throws CustomItemReaderException {
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("npag", nPag);
nPag++;
return "content";
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
作者
@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {
private String nPag;
@Override
public void write(List<? extends String> continguts) throws Exception {
try {
LOG.info("INSIDE WRITE, NPAG: " + nPag);
} catch (Throwable ex) {
LOG.error("Error: " + ex.getMessage());
}
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.nPag = jobContext.get("npag").toString();
}
}
JOB/STEP 批量配置
@Bean
public Job lCJob() {
return jobs.get("lCJob")
.listener(jobListener)
.start(lCStep())
.build();
}
@Bean
public Step lCStep() {
return steps.get("lCStep")
.<String, String>chunk(1)
.reader(lCItemReader)
.processor(lCProcessor)
.writer(lCItemWriter)
.listener(promotionListener())
.build();
}
聆听者
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"npag"});
return executionContextPromotionListener;
}
ExecutionContextPromotionListener 明确声明它在步骤结束时工作,以便在编写器执行之后。所以我认为你指望的促销活动并没有在你认为它发生的时候发生。
如果我是你,我会在步骤上下文中设置它,如果你需要一个步骤中的值,我会从步骤中获取它。否则我会将其设置为工作上下文。
另一方面是@BeforeStep。这标志着在步骤上下文存在之前执行的方法。您在 reader 中设置 nPag 值的方式是在步骤开始执行之后。
您正在尝试读取 nPag 的值,甚至在 reader 中设置 nPag 之前,它的默认值为空。您需要在直接从执行上下文记录日志时读取 nPag 上的值。您可以保留对 jobContext 的引用。试试这个
@Component
@StepScope
public class LCItemWriter implements ItemWriter<String> {
private String nPag;
private ExecutionContext jobContext;
@Override
public void write(List<? extends String> continguts) throws Exception {
try {
this.nPag = jobContext.get("npag").toString();
LOG.info("INSIDE WRITE, NPAG: " + nPag);
} catch (Throwable ex) {
LOG.error("Error: " + ex.getMessage());
}
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
jobContext = jobExecution.getExecutionContext();
}
}
在您的 Reader 和 Writer 中,您需要实现 ItemStream 接口并使用 ExecutionContext 作为成员 variable.Here 我已经给出了 Processor 而不是 Writer 的示例,但同样适用于 Writer。它的工作对我来说很好,我能够从 reader 获取值到处理器。
我已在 reader 的上下文中设置值并在处理器中获取值。
public class EmployeeItemReader implements ItemReader<Employee>, ItemStream {
ExecutionContext context;
@Override
public Employee read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
context.put("ajay", "i am going well");
Employee emp=new Employee();
emp.setEmpId(1);
emp.setFirstName("ajay");
emp.setLastName("goswami");
return emp;
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void open(ExecutionContext arg0) throws ItemStreamException {
context = arg0;
}
@Override
public void update(ExecutionContext arg0) throws ItemStreamException {
// TODO Auto-generated method stub
context = arg0;
}
}
我的处理器
public class CustomItemProcessor implements ItemProcessor<Employee,ActiveEmployee>,ItemStream{
ExecutionContext context;
@Override
public ActiveEmployee process(Employee emp) throws Exception {
//See this line
System.out.println(context.get("ajay"));
ActiveEmployee actEmp=new ActiveEmployee();
actEmp.setEmpId(emp.getEmpId());
actEmp.setFirstName(emp.getFirstName());
actEmp.setLastName(emp.getLastName());
actEmp.setAdditionalInfo("Employee is processed");
return actEmp;
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void open(ExecutionContext arg0) throws ItemStreamException {
// TODO Auto-generated method stub
}
@Override
public void update(ExecutionContext arg0) throws ItemStreamException {
context = arg0;
}
}
希望对您有所帮助。