将消息转换为作业以使其 Spring 与批处理集成

Convert Message to Job to make it Spring Integration with Batch Processing

我正在尝试使用 Spring 集成以批处理方式处理一系列文件。我有一个非常古老的 xml,它试图将消息转换为工作

    <int:transformer ref="messageToJobTransformer"/>
    <batch-int:job-launching-gateway job-launcher="jobLauncher"/>

messageToJobTransformer 是一个 class 可以将 Message 转换为 Job。问题是我不知道这个文件现在在哪里,我也不想要 xml 配置。我希望它是纯 Java DSL。这是我的简单配置。

return IntegrationFlows.from(Files.inboundAdapter(directory)
                    .preventDuplicates()
                    .patternFilter("*.txt")))
                    .handle(jobLaunchingGw())                       
                    .get();

这是我的网关 bean。

@Autowired
private JobLauncher jobLauncher;

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher);   
}

编辑:更新批处理配置 class。

@Configuration
    @EnableBatchProcessing
public class BatchConfig

{

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;




@Bean
        public ItemReader<String> reader(@Value({jobParameters['input.file.name']}") String filename) throws MalformedURLException
    {
        FlatFileItemReader<String> reader = new FlatFileItemReader<String>();       
        return reader;
    }
@Bean
public Job job() throws MalformedURLException
{
    return jobs.get("job").start(step()).build();
}

@Bean
public Step step() throws MalformedURLException
{
    return steps.get("step").<String, String> chunk(5).reader(reader())
    .writer(writer()).build();
}

@Bean
public ItemWriter<String> writer(@Value("#{jobParameters['input.file.name']}")
{
    FlatFileItemWriter writer = new FlatFileItemWriter();
    return writer;
}



}

你的问题不清楚。 JobLaunchingGateway 期望 JobLaunchRequest 作为 payload

由于您的集成流程从 Files.inboundAdapter(directory) 开始,我可以假设您有一些作业定义。所以,你需要的是一些可以解析文件的 class 和 return JobLaunchRequest.

来自 Spring 批次 Reference Manual 的类似内容:

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

在将 class 定义为 @Bean 之后,您可以在 .handle(jobLaunchingGw()).

之前从 .transform() EIP 方法中使用它

更新

@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job job) {
     FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
     fileMessageToJobRequest.setJob(job);
     fileMessageToJobRequest.setfileParameterName("file");
     return fileMessageToJobRequest;
}
...

@Bean
public IntegrationFlow flowToBatch(FileMessageToJobRequest fileMessageToJobRequest) {

      return IntegrationFlows
            .from(Files.inboundAdapter(directory)
                    .preventDuplicates()
                    .patternFilter("*.txt")))
            .transform(fileMessageToJobRequest)
            .handle(jobLaunchingGw())                       
                    .get();

}