将消息转换为作业以使其 Spring 与批处理集成
Convert Message to Job to make it Spring Integration with Batch Processing
我正在尝试使用 Spring 集成以批处理方式处理一系列文件。我有一个非常古老的 xml,它试图将消息转换为工作
<int:transformer ref="messageToJobTransformer"/>
<batch-int:job-launching-gateway job-launcher="jobLauncher"/>
messageToJobTransformer
是一个 class 可以将 Message 转换为 Job。问题是我不知道这个文件现在在哪里,我也不想要 xml 配置。我希望它是纯 Java DSL。这是我的简单配置。
return IntegrationFlows.from(Files.inboundAdapter(directory)
.preventDuplicates()
.patternFilter("*.txt")))
.handle(jobLaunchingGw())
.get();
这是我的网关 bean。
@Autowired
private JobLauncher jobLauncher;
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher);
}
编辑:更新批处理配置 class。
@Configuration
@EnableBatchProcessing
public class BatchConfig
{
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<String> reader(@Value({jobParameters['input.file.name']}") String filename) throws MalformedURLException
{
FlatFileItemReader<String> reader = new FlatFileItemReader<String>();
return reader;
}
@Bean
public Job job() throws MalformedURLException
{
return jobs.get("job").start(step()).build();
}
@Bean
public Step step() throws MalformedURLException
{
return steps.get("step").<String, String> chunk(5).reader(reader())
.writer(writer()).build();
}
@Bean
public ItemWriter<String> writer(@Value("#{jobParameters['input.file.name']}")
{
FlatFileItemWriter writer = new FlatFileItemWriter();
return writer;
}
}
你的问题不清楚。 JobLaunchingGateway
期望 JobLaunchRequest
作为 payload
。
由于您的集成流程从 Files.inboundAdapter(directory)
开始,我可以假设您有一些作业定义。所以,你需要的是一些可以解析文件的 class 和 return JobLaunchRequest
.
来自 Spring 批次 Reference Manual 的类似内容:
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
在将 class 定义为 @Bean
之后,您可以在 .handle(jobLaunchingGw())
.
之前从 .transform()
EIP 方法中使用它
更新
@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job job) {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setJob(job);
fileMessageToJobRequest.setfileParameterName("file");
return fileMessageToJobRequest;
}
...
@Bean
public IntegrationFlow flowToBatch(FileMessageToJobRequest fileMessageToJobRequest) {
return IntegrationFlows
.from(Files.inboundAdapter(directory)
.preventDuplicates()
.patternFilter("*.txt")))
.transform(fileMessageToJobRequest)
.handle(jobLaunchingGw())
.get();
}
我正在尝试使用 Spring 集成以批处理方式处理一系列文件。我有一个非常古老的 xml,它试图将消息转换为工作
<int:transformer ref="messageToJobTransformer"/>
<batch-int:job-launching-gateway job-launcher="jobLauncher"/>
messageToJobTransformer
是一个 class 可以将 Message 转换为 Job。问题是我不知道这个文件现在在哪里,我也不想要 xml 配置。我希望它是纯 Java DSL。这是我的简单配置。
return IntegrationFlows.from(Files.inboundAdapter(directory)
.preventDuplicates()
.patternFilter("*.txt")))
.handle(jobLaunchingGw())
.get();
这是我的网关 bean。
@Autowired
private JobLauncher jobLauncher;
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher);
}
编辑:更新批处理配置 class。
@Configuration
@EnableBatchProcessing
public class BatchConfig
{
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<String> reader(@Value({jobParameters['input.file.name']}") String filename) throws MalformedURLException
{
FlatFileItemReader<String> reader = new FlatFileItemReader<String>();
return reader;
}
@Bean
public Job job() throws MalformedURLException
{
return jobs.get("job").start(step()).build();
}
@Bean
public Step step() throws MalformedURLException
{
return steps.get("step").<String, String> chunk(5).reader(reader())
.writer(writer()).build();
}
@Bean
public ItemWriter<String> writer(@Value("#{jobParameters['input.file.name']}")
{
FlatFileItemWriter writer = new FlatFileItemWriter();
return writer;
}
}
你的问题不清楚。 JobLaunchingGateway
期望 JobLaunchRequest
作为 payload
。
由于您的集成流程从 Files.inboundAdapter(directory)
开始,我可以假设您有一些作业定义。所以,你需要的是一些可以解析文件的 class 和 return JobLaunchRequest
.
来自 Spring 批次 Reference Manual 的类似内容:
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
在将 class 定义为 @Bean
之后,您可以在 .handle(jobLaunchingGw())
.
.transform()
EIP 方法中使用它
更新
@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job job) {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setJob(job);
fileMessageToJobRequest.setfileParameterName("file");
return fileMessageToJobRequest;
}
...
@Bean
public IntegrationFlow flowToBatch(FileMessageToJobRequest fileMessageToJobRequest) {
return IntegrationFlows
.from(Files.inboundAdapter(directory)
.preventDuplicates()
.patternFilter("*.txt")))
.transform(fileMessageToJobRequest)
.handle(jobLaunchingGw())
.get();
}