复制 header 标签到 xml spring 批量应用
Copy header tag in xml spring batch application
我在 spring-boot
应用程序中使用 spring-batch
。 Spring 引导版本是 2.3.3.RELEASE
。
What I intend to achieve
我必须 read a xml file
包含数千个 Transactions
和 header tag
(fileInformation)。在事务上做一些业务逻辑,然后用事务中的更新值写回文件。我使用 StaxEventItemReader
读取文件,使用 StaxEventItemWriter
写入文件。然后我有几个 ItemProcessors
用于处理业务逻辑。 Xml 文件看起来像:
<?xml version="1.0" encoding="UTF-8"?>
<reportFile>
<fileInformation>
<sender>200GH7XZ60</sender>
<timestamp>2020-12-23T09:05:34Z</timestamp>
<environment>PRO</environment>
<version>001.60</version>
</fileInformation>
<record>
<transaction>
<buyer><buyer/>
</transaction>
<transaction>
<buyer><buyer/>
</transaction>
<transaction>
<buyer><buyer/>
</transaction>
</record>
</reportFile>
Problem that I am facing is with the value of header tags.
我已经配置了生成所需 header 标签的 OmegaXmlHeaderCallBack
,但这些标签中的值应该从输入文件中复制。据我所知,StaxWriterCallback
在 reader、处理器和编写器之前初始化。所以我无法使用 late binding
注入值。
这看起来像是基本要求,但在 Whosebug
.
上找不到任何解决方案
Here is the code snippets to configure spring batch job.
@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;
@Bean
public Step jobStep(ItemStreamReader<CustomHeaderTransactionXmlElement> reader,
CompositeItemProcessor<CustomHeaderTransactionXmlElement,
ProcessorWriterDto> processor,
CompositeItemWriter<ProcessorWriterDto> writer,
EdsClientItemWriteListener<ProcessorWriterDto> writeListener,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("extractAndReplacePersonalDataStep")
.<CustomHeaderTransactionXmlElement, ProcessorWriterDto>chunk(chunkSize)
.reader(reader)
.processor(processor)
.listener(writeListener)
.writer(writer)
.build();
}
@Bean
public Job extractPersonalDataJob(Step jobStep, JobResultListener jobListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("extractAndReplacePersonalDataJob")
.incrementer(new RunIdIncrementer())
.start(jobStep)
.listener(jobListener)
.build();
}
@Bean
@StepScope
public ItemStreamReader<CustomHeaderTransactionXmlElement> itemReader(@Value("#{jobParameters[file.path]}") String path) {
Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
transactionMarshaller.setClassesToBeBound (FileInformation.class, TransactionPositionReport.class);
log.info("Generating StaxEventItemReader");
return new StaxEventItemReaderBuilder<CustomHeaderTransactionXmlElement>()
.name("headerTransaction")
.resource(new FileSystemResource(new FileSystemResource(path)))
.addFragmentRootElements("fileInformation", "transaction")
.unmarshaller(transactionMarshaller)
.build();
}
@Bean
@StepScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version){
return new OmegaXmlHeaderCallBack(sender, timestamp, environment, version);
}
@Bean
@StepScope
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
return new OmegaXmlFooterCallBack();
}
@StepScope
@Bean(name = "staxTransactionWriter")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
String exportFilePath = "C:\Users\sasharma\Documents\TO_BE_DELETED\eugateway\outputfile.xml";
Resource exportFileResource = new FileSystemResource(exportFilePath);
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setSupportDtd(true);
marshaller.setSupportJaxbElementClass(true);
marshaller.setClassesToBeBound(TransactionPositionReport.class);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.version("1.0")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("reportFile")
.headerCallback(getOmegaXmlHeaderCallBack(sender, timestamp, environment, version))
.footerCallback(getOmegaXmlFooterCallBack())
.shouldDeleteIfEmpty(true)
.build();
}
@Bean
@StepScope
public PIExtractorItemProcessor extractItemProcessor() {
log.info("Generating PIExtractorItemProcessor");
return new PIExtractorItemProcessor();
}
@Bean
public PIRemoverItemProcessor removeItemProcessor() {
log.info("Generating PIRemoverItemProcessor");
return new PIRemoverItemProcessor();
}
@Bean
@StepScope
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> extractAndRemoveItemProcessor() {
log.info("Generating CompositeItemProcessor");
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
return itemProcessor;
}
@Bean
@StepScope
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
log.info("Generating EdsClientItemWriter");
return new EdsClientItemWriter<>();
}
@Bean
@StepScope
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
log.info("Generating OmegaXmlFileWriter");
return new OmegaXmlFileWriter(staxTransactionItemWriter(sender, timestamp, environment, version));
}
@Bean
@StepScope
public CompositeItemWriter<ProcessorWriterDto> compositeItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
log.info("Generating CompositeItemWriter");
CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(sender, timestamp, environment, version)));
return compositeItemWriter;
}
}
下面是OmegaXmlHeaderCallBack
class。由于没有后期绑定,我总是在 header 标签中得到空值。
@Slf4j
public class OmegaXmlHeaderCallBack implements StaxWriterCallback {
private String sender;
private String timestamp;
private String environment;
private String version;
public OmegaXmlHeaderCallBack(String sender, String timestamp, String environment, String version) {
super();
this.sender = sender;
this.timestamp = timestamp;
this.environment = environment;
this.version = version;
}
@Override
public void write(XMLEventWriter writer) {
XMLEventFactory factory = XMLEventFactory.newInstance();
try {
writer.add(factory.createStartElement("", "", "fileInformation"));
writer.add(factory.createStartElement("", "", "sender"));
writer.add(factory.createCharacters(sender));
writer.add(factory.createEndElement("", "", "sender"));
writer.add(factory.createStartElement("", "", "timestamp"));
writer.add(factory.createCharacters(timestamp));
writer.add(factory.createEndElement("", "", "timestamp"));
writer.add(factory.createStartElement("", "", "environment"));
writer.add(factory.createCharacters(environment));
writer.add(factory.createEndElement("", "", "environment"));
writer.add(factory.createStartElement("", "", "version"));
writer.add(factory.createCharacters(version));
writer.add(factory.createEndElement("", "", "version"));
writer.add(factory.createEndElement("", "", "fileInformation"));
writer.add(factory.createStartElement("", "", "record"));
} catch (XMLStreamException e) {
log.error("Error writing OMEGA XML Header: {}", e.getMessage());
throw new OmegaXmlHeaderWriterException(e.getMessage());
}
}
}
ItemProcessor
的代码如下。我正在将 header 数据设置到 ExecutionContext
中,该数据旨在由 headerCallback 读取(遗憾的是不会发生)。
@Slf4j
public class PIExtractorItemProcessor implements ItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> {
@Autowired
PersonalDataExtractor personalDataExtractor;
@Value("#{jobParameters['submission.account']}")
private String subAccntId;
@Value("#{stepExecution}")
private StepExecution stepExecution;
@Override
public ProcessorWriterDto process(CustomHeaderTransactionXmlElement headerTransactionElement) throws Exception {
FileInformation header = null;
TransactionPositionReport transaction = null;
if(headerTransactionElement instanceof FileInformation) {
header = (FileInformation)headerTransactionElement;
stepExecution.getExecutionContext().put("header.sender", header.getSender());
stepExecution.getExecutionContext().put("header.timestamp", header.getTimestamp());
stepExecution.getExecutionContext().put("header.environment", header.getEnvironment());
stepExecution.getExecutionContext().put("header.version", header.getVersion());
log.debug("Header {} found.", header.toString());
return null;
} else {
transaction = (TransactionPositionReport)headerTransactionElement;
log.debug("NO header info found for transaction {}", transaction.getProcessingDetails().getCustomerTransactionId());
log.info("Extracting personal data for transaction customer id {} and create EDS requestDto.", transaction.getProcessingDetails().getCustomerTransactionId());
ProcessorWriterDto transferObject = new ProcessorWriterDto();
transferObject.setEdsRequestDtoList(personalDataExtractor.extract(transaction, subAccntId));
transferObject.setTransaction(transaction);
return transferObject;
}
}
}
我引用的链接:
- accessing-the-executioncontext-values-in-headercallback
你的步骤做的太多了。我会将事情分为两个步骤:
- 第一步:提取文件信息header并放入作业执行上下文
- 第 2 步:从执行上下文中读取文件信息 header 并将其用于该步骤所需的任何 step-scoped bean(例如您的案例中的 stax 回调)
这是一个简单的例子:
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import javax.xml.stream.XMLEventWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.xml.StaxWriterCallback;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class SO67909123 {
@Bean
public Step extractHeaderStep(StepBuilderFactory steps) {
return steps.get("extractHeaderStep")
.tasklet((contribution, chunkContext) -> {
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
String inputFile = (String) jobParameters.get("file");
FileInformation fileInformation = extractFileInformation(inputFile);
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobExecutionContext.put("file.information", fileInformation);
return RepeatStatus.FINISHED;
}).build();
}
private FileInformation extractFileInformation(String inputFile) {
// TODO extract header from inputFile
FileInformation fileInformation = new FileInformation();
fileInformation.sender = "200GH7XKDGO3GLZ60";
fileInformation.version = "001.60";
return fileInformation;
}
@Bean
public Step processFile(StepBuilderFactory steps) {
return steps.get("processFile")
.tasklet((contribution, chunkContext) -> { // Change this to a chunk-oriented tasklet
Map<String, Object> jobExecutionContext = chunkContext.getStepContext().getJobExecutionContext();
FileInformation fileInformation = (FileInformation) jobExecutionContext.get("file.information");
System.out.println("Step 2: " + fileInformation);
return RepeatStatus.FINISHED;
}).build();
}
@Bean
@StepScope
public StaxWriterCallback staxWriterCallback(@Value("#{jobExecutionContext['file.information']}") FileInformation fileInformation) {
return new StaxWriterCallback() {
@Override
public void write(XMLEventWriter writer) throws IOException {
// use fileInformation as needed here
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(extractHeaderStep(steps))
.next(processFile(steps))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO67909123.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("file", "transactions.xml")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
static class FileInformation implements Serializable {
private String sender;
private String version;
// other fields
@Override
public String toString() {
return "FileInformation{sender='" + sender + '\'' + ", version='" + version + '\'' + '}';
}
}
}
这个例子展示了这个想法。您只需编写从文件中提取 xml 标记的片段(仅 header,请参阅 TODO)。该示例中的 StaxWriterCallback
是一个 step-scoped bean,可以使用执行上下文中的 header。第 2 步中的其他 step-scoped 组件也可以用相同的方式配置(处理器、侦听器等)。
我在 spring-boot
应用程序中使用 spring-batch
。 Spring 引导版本是 2.3.3.RELEASE
。
What I intend to achieve
我必须 read a xml file
包含数千个 Transactions
和 header tag
(fileInformation)。在事务上做一些业务逻辑,然后用事务中的更新值写回文件。我使用 StaxEventItemReader
读取文件,使用 StaxEventItemWriter
写入文件。然后我有几个 ItemProcessors
用于处理业务逻辑。 Xml 文件看起来像:
<?xml version="1.0" encoding="UTF-8"?>
<reportFile>
<fileInformation>
<sender>200GH7XZ60</sender>
<timestamp>2020-12-23T09:05:34Z</timestamp>
<environment>PRO</environment>
<version>001.60</version>
</fileInformation>
<record>
<transaction>
<buyer><buyer/>
</transaction>
<transaction>
<buyer><buyer/>
</transaction>
<transaction>
<buyer><buyer/>
</transaction>
</record>
</reportFile>
Problem that I am facing is with the value of header tags.
我已经配置了生成所需 header 标签的 OmegaXmlHeaderCallBack
,但这些标签中的值应该从输入文件中复制。据我所知,StaxWriterCallback
在 reader、处理器和编写器之前初始化。所以我无法使用 late binding
注入值。
这看起来像是基本要求,但在 Whosebug
.
Here is the code snippets to configure spring batch job.
@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;
@Bean
public Step jobStep(ItemStreamReader<CustomHeaderTransactionXmlElement> reader,
CompositeItemProcessor<CustomHeaderTransactionXmlElement,
ProcessorWriterDto> processor,
CompositeItemWriter<ProcessorWriterDto> writer,
EdsClientItemWriteListener<ProcessorWriterDto> writeListener,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("extractAndReplacePersonalDataStep")
.<CustomHeaderTransactionXmlElement, ProcessorWriterDto>chunk(chunkSize)
.reader(reader)
.processor(processor)
.listener(writeListener)
.writer(writer)
.build();
}
@Bean
public Job extractPersonalDataJob(Step jobStep, JobResultListener jobListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("extractAndReplacePersonalDataJob")
.incrementer(new RunIdIncrementer())
.start(jobStep)
.listener(jobListener)
.build();
}
@Bean
@StepScope
public ItemStreamReader<CustomHeaderTransactionXmlElement> itemReader(@Value("#{jobParameters[file.path]}") String path) {
Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
transactionMarshaller.setClassesToBeBound (FileInformation.class, TransactionPositionReport.class);
log.info("Generating StaxEventItemReader");
return new StaxEventItemReaderBuilder<CustomHeaderTransactionXmlElement>()
.name("headerTransaction")
.resource(new FileSystemResource(new FileSystemResource(path)))
.addFragmentRootElements("fileInformation", "transaction")
.unmarshaller(transactionMarshaller)
.build();
}
@Bean
@StepScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version){
return new OmegaXmlHeaderCallBack(sender, timestamp, environment, version);
}
@Bean
@StepScope
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
return new OmegaXmlFooterCallBack();
}
@StepScope
@Bean(name = "staxTransactionWriter")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
String exportFilePath = "C:\Users\sasharma\Documents\TO_BE_DELETED\eugateway\outputfile.xml";
Resource exportFileResource = new FileSystemResource(exportFilePath);
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setSupportDtd(true);
marshaller.setSupportJaxbElementClass(true);
marshaller.setClassesToBeBound(TransactionPositionReport.class);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.version("1.0")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("reportFile")
.headerCallback(getOmegaXmlHeaderCallBack(sender, timestamp, environment, version))
.footerCallback(getOmegaXmlFooterCallBack())
.shouldDeleteIfEmpty(true)
.build();
}
@Bean
@StepScope
public PIExtractorItemProcessor extractItemProcessor() {
log.info("Generating PIExtractorItemProcessor");
return new PIExtractorItemProcessor();
}
@Bean
public PIRemoverItemProcessor removeItemProcessor() {
log.info("Generating PIRemoverItemProcessor");
return new PIRemoverItemProcessor();
}
@Bean
@StepScope
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> extractAndRemoveItemProcessor() {
log.info("Generating CompositeItemProcessor");
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
return itemProcessor;
}
@Bean
@StepScope
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
log.info("Generating EdsClientItemWriter");
return new EdsClientItemWriter<>();
}
@Bean
@StepScope
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
log.info("Generating OmegaXmlFileWriter");
return new OmegaXmlFileWriter(staxTransactionItemWriter(sender, timestamp, environment, version));
}
@Bean
@StepScope
public CompositeItemWriter<ProcessorWriterDto> compositeItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
@Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
@Value("#{jobExecutionContext['header.environment']}") String environment,
@Value("#{jobExecutionContext['header.version']}") String version) {
log.info("Generating CompositeItemWriter");
CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(sender, timestamp, environment, version)));
return compositeItemWriter;
}
}
下面是OmegaXmlHeaderCallBack
class。由于没有后期绑定,我总是在 header 标签中得到空值。
@Slf4j
public class OmegaXmlHeaderCallBack implements StaxWriterCallback {
private String sender;
private String timestamp;
private String environment;
private String version;
public OmegaXmlHeaderCallBack(String sender, String timestamp, String environment, String version) {
super();
this.sender = sender;
this.timestamp = timestamp;
this.environment = environment;
this.version = version;
}
@Override
public void write(XMLEventWriter writer) {
XMLEventFactory factory = XMLEventFactory.newInstance();
try {
writer.add(factory.createStartElement("", "", "fileInformation"));
writer.add(factory.createStartElement("", "", "sender"));
writer.add(factory.createCharacters(sender));
writer.add(factory.createEndElement("", "", "sender"));
writer.add(factory.createStartElement("", "", "timestamp"));
writer.add(factory.createCharacters(timestamp));
writer.add(factory.createEndElement("", "", "timestamp"));
writer.add(factory.createStartElement("", "", "environment"));
writer.add(factory.createCharacters(environment));
writer.add(factory.createEndElement("", "", "environment"));
writer.add(factory.createStartElement("", "", "version"));
writer.add(factory.createCharacters(version));
writer.add(factory.createEndElement("", "", "version"));
writer.add(factory.createEndElement("", "", "fileInformation"));
writer.add(factory.createStartElement("", "", "record"));
} catch (XMLStreamException e) {
log.error("Error writing OMEGA XML Header: {}", e.getMessage());
throw new OmegaXmlHeaderWriterException(e.getMessage());
}
}
}
ItemProcessor
的代码如下。我正在将 header 数据设置到 ExecutionContext
中,该数据旨在由 headerCallback 读取(遗憾的是不会发生)。
@Slf4j
public class PIExtractorItemProcessor implements ItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> {
@Autowired
PersonalDataExtractor personalDataExtractor;
@Value("#{jobParameters['submission.account']}")
private String subAccntId;
@Value("#{stepExecution}")
private StepExecution stepExecution;
@Override
public ProcessorWriterDto process(CustomHeaderTransactionXmlElement headerTransactionElement) throws Exception {
FileInformation header = null;
TransactionPositionReport transaction = null;
if(headerTransactionElement instanceof FileInformation) {
header = (FileInformation)headerTransactionElement;
stepExecution.getExecutionContext().put("header.sender", header.getSender());
stepExecution.getExecutionContext().put("header.timestamp", header.getTimestamp());
stepExecution.getExecutionContext().put("header.environment", header.getEnvironment());
stepExecution.getExecutionContext().put("header.version", header.getVersion());
log.debug("Header {} found.", header.toString());
return null;
} else {
transaction = (TransactionPositionReport)headerTransactionElement;
log.debug("NO header info found for transaction {}", transaction.getProcessingDetails().getCustomerTransactionId());
log.info("Extracting personal data for transaction customer id {} and create EDS requestDto.", transaction.getProcessingDetails().getCustomerTransactionId());
ProcessorWriterDto transferObject = new ProcessorWriterDto();
transferObject.setEdsRequestDtoList(personalDataExtractor.extract(transaction, subAccntId));
transferObject.setTransaction(transaction);
return transferObject;
}
}
}
我引用的链接:
- accessing-the-executioncontext-values-in-headercallback
你的步骤做的太多了。我会将事情分为两个步骤:
- 第一步:提取文件信息header并放入作业执行上下文
- 第 2 步:从执行上下文中读取文件信息 header 并将其用于该步骤所需的任何 step-scoped bean(例如您的案例中的 stax 回调)
这是一个简单的例子:
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import javax.xml.stream.XMLEventWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.xml.StaxWriterCallback;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class SO67909123 {
@Bean
public Step extractHeaderStep(StepBuilderFactory steps) {
return steps.get("extractHeaderStep")
.tasklet((contribution, chunkContext) -> {
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
String inputFile = (String) jobParameters.get("file");
FileInformation fileInformation = extractFileInformation(inputFile);
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobExecutionContext.put("file.information", fileInformation);
return RepeatStatus.FINISHED;
}).build();
}
private FileInformation extractFileInformation(String inputFile) {
// TODO extract header from inputFile
FileInformation fileInformation = new FileInformation();
fileInformation.sender = "200GH7XKDGO3GLZ60";
fileInformation.version = "001.60";
return fileInformation;
}
@Bean
public Step processFile(StepBuilderFactory steps) {
return steps.get("processFile")
.tasklet((contribution, chunkContext) -> { // Change this to a chunk-oriented tasklet
Map<String, Object> jobExecutionContext = chunkContext.getStepContext().getJobExecutionContext();
FileInformation fileInformation = (FileInformation) jobExecutionContext.get("file.information");
System.out.println("Step 2: " + fileInformation);
return RepeatStatus.FINISHED;
}).build();
}
@Bean
@StepScope
public StaxWriterCallback staxWriterCallback(@Value("#{jobExecutionContext['file.information']}") FileInformation fileInformation) {
return new StaxWriterCallback() {
@Override
public void write(XMLEventWriter writer) throws IOException {
// use fileInformation as needed here
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(extractHeaderStep(steps))
.next(processFile(steps))
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO67909123.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("file", "transactions.xml")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
static class FileInformation implements Serializable {
private String sender;
private String version;
// other fields
@Override
public String toString() {
return "FileInformation{sender='" + sender + '\'' + ", version='" + version + '\'' + '}';
}
}
}
这个例子展示了这个想法。您只需编写从文件中提取 xml 标记的片段(仅 header,请参阅 TODO)。该示例中的 StaxWriterCallback
是一个 step-scoped bean,可以使用执行上下文中的 header。第 2 步中的其他 step-scoped 组件也可以用相同的方式配置(处理器、侦听器等)。