如何使用 Apache Camel 读取文件,使用 Spring 批处理文件,读取每一行并将其路由回 Apache Camel
How to read file using Apache Camel, process the file using Spring batch, read each line and route it back through Apache Camel
想知道如何将 JAXBElement 传递给 Camel 路由,它从批处理文件的每一行读取通过 Spring 通过 Camel Route 加载的批处理。
下面给出的代码片段使用 customerWriter 方法调用 JMSTemplate 将消息写入队列。相反,我需要将消息路由到另一条 Camel 路由。
Current: CamelRoute -> ReadFile -> Spring Batch -> Process Each line -> Queue
预期: CamelRoute -> ReadFile -> Spring Batch -> Process Each line -> Camel Route
骆驼路线读取文件:
@Override
public void configure() {
String fromUri = batchLoadPath + "?" + batchFileOptions;
from(fromUri).process(new Processor() {
public void process(Exchange msg) {
File file = msg.getIn().getBody(File.class);
String fileName = file.getAbsolutePath();
try {
JobParameters jobParameters = new JobParametersBuilder().addString("input.file.name", fileName).addDate("dateTime", new Date()).toJobParameters();
jobLauncher.run(importCustomerJob, jobParameters);
} catch (Exception e) {
log.error(Process file encountered error:" + e.getMessage(), e);
}
}
})
.to("log:EndBatch");
批量配置:
@Bean
public JmsItemWriter<String> customerWriter() {
JmsItemWriter<String> writer = new JmsItemWriter<String>();
writer.setJmsTemplate(jmsTemplate);
return writer;
}
public Job importCustomerJob(JobCompletionNotificationListener listener, JobBuilderFactory jobBuilderFactory, Step step1) {
JobBuilder builder = jobBuilderFactory.get("importCustomerJob");
builder.incrementer(new RunIdIncrementer());
builder.listener(listener);
JobFlowBuilder jfb = builder.flow(step1);
jfb.end();
Job job = jfb.build().build();
return job;
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
// Read chunk of 10 records and writing those as messages to queue
return stepBuilderFactory.get("step1")
.<Customer, String>chunk(10)
.reader(customerReader())
.faultTolerant()
.skipPolicy(fileVerificationSkipper())
.processor(customerItemProcessor())
.writer(customerWriter())
.listener(customerReader())
.build();
}
批处理器:
public class CustomerItemProcessor implements ItemProcessor<Customer, String> {
@Autowired
JaxbUtil jaxbUtil;
public String process(Customer item) throws Exception {
// Mapping code goes here
JAXBElement<CustomerX> mobj = customerFactory.createCustomerX(cp);
return jaxbUtil.objectToXml(mobj);
}
}
好吧,从 Camel 的角度来看,要调用另一个 Camel 路由,您只需添加一个 .to()
语句。例如同步调用内存路由,你可以使用direct:
.
from(fromUri).process(new Processor() {
public void process(Exchange msg) {
... your processor impl
}
})
.to("direct:yourOtherRoute")
.to("log:EndBatch");
from("direct:yourOtherRoute")
...
要将处理器的结果传递给下一个路由,处理器必须将此结果设置到 Exchange Body 中。
感谢@Burki 和@Roman Vottner 的建议。这是我修改过的代码,它可以正常工作。
解法:
在 Batch Config 中添加了一个 writer 方法并调用它而不是 JMSWriter
@Bean
public CamelItemWriter<String> customerCamelWriter() {
ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
CamelItemWriter<String> writer = new CamelItemWriter<String>(producerTemplate, "direct:process");
return writer;
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
// Read chunk of 10 records and writing those as messages to CAS.TRF.MDM queue
return stepBuilderFactory.get("step1")
.<Customer, String>chunk(10)
.reader(customerReader())
.faultTolerant()
.skipPolicy(fileVerificationSkipper())
.processor(customerItemProcessor())
.writer(customerCamelWriter())
.listener(customerReader())
.build();
}
想知道如何将 JAXBElement 传递给 Camel 路由,它从批处理文件的每一行读取通过 Spring 通过 Camel Route 加载的批处理。
下面给出的代码片段使用 customerWriter 方法调用 JMSTemplate 将消息写入队列。相反,我需要将消息路由到另一条 Camel 路由。
Current: CamelRoute -> ReadFile -> Spring Batch -> Process Each line -> Queue
预期: CamelRoute -> ReadFile -> Spring Batch -> Process Each line -> Camel Route
骆驼路线读取文件:
@Override
public void configure() {
String fromUri = batchLoadPath + "?" + batchFileOptions;
from(fromUri).process(new Processor() {
public void process(Exchange msg) {
File file = msg.getIn().getBody(File.class);
String fileName = file.getAbsolutePath();
try {
JobParameters jobParameters = new JobParametersBuilder().addString("input.file.name", fileName).addDate("dateTime", new Date()).toJobParameters();
jobLauncher.run(importCustomerJob, jobParameters);
} catch (Exception e) {
log.error(Process file encountered error:" + e.getMessage(), e);
}
}
})
.to("log:EndBatch");
批量配置:
@Bean
public JmsItemWriter<String> customerWriter() {
JmsItemWriter<String> writer = new JmsItemWriter<String>();
writer.setJmsTemplate(jmsTemplate);
return writer;
}
public Job importCustomerJob(JobCompletionNotificationListener listener, JobBuilderFactory jobBuilderFactory, Step step1) {
JobBuilder builder = jobBuilderFactory.get("importCustomerJob");
builder.incrementer(new RunIdIncrementer());
builder.listener(listener);
JobFlowBuilder jfb = builder.flow(step1);
jfb.end();
Job job = jfb.build().build();
return job;
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
// Read chunk of 10 records and writing those as messages to queue
return stepBuilderFactory.get("step1")
.<Customer, String>chunk(10)
.reader(customerReader())
.faultTolerant()
.skipPolicy(fileVerificationSkipper())
.processor(customerItemProcessor())
.writer(customerWriter())
.listener(customerReader())
.build();
}
批处理器:
public class CustomerItemProcessor implements ItemProcessor<Customer, String> {
@Autowired
JaxbUtil jaxbUtil;
public String process(Customer item) throws Exception {
// Mapping code goes here
JAXBElement<CustomerX> mobj = customerFactory.createCustomerX(cp);
return jaxbUtil.objectToXml(mobj);
}
}
好吧,从 Camel 的角度来看,要调用另一个 Camel 路由,您只需添加一个 .to()
语句。例如同步调用内存路由,你可以使用direct:
.
from(fromUri).process(new Processor() {
public void process(Exchange msg) {
... your processor impl
}
})
.to("direct:yourOtherRoute")
.to("log:EndBatch");
from("direct:yourOtherRoute")
...
要将处理器的结果传递给下一个路由,处理器必须将此结果设置到 Exchange Body 中。
感谢@Burki 和@Roman Vottner 的建议。这是我修改过的代码,它可以正常工作。
解法:
在 Batch Config 中添加了一个 writer 方法并调用它而不是 JMSWriter
@Bean
public CamelItemWriter<String> customerCamelWriter() {
ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
CamelItemWriter<String> writer = new CamelItemWriter<String>(producerTemplate, "direct:process");
return writer;
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
// Read chunk of 10 records and writing those as messages to CAS.TRF.MDM queue
return stepBuilderFactory.get("step1")
.<Customer, String>chunk(10)
.reader(customerReader())
.faultTolerant()
.skipPolicy(fileVerificationSkipper())
.processor(customerItemProcessor())
.writer(customerCamelWriter())
.listener(customerReader())
.build();
}