Spring 使用 Java DSL 的批量集成/启动作业

Spring Batch Integration using Java DSL / launching jobs

我有一个工作的 spring boot/batch projet 包含 2 个工作。

我现在正在尝试仅使用 java 配置/java DSL 添加集成以从远程 SFTP 轮询文件,然后启动作业。

文件轮询工作正常,但我不知道如何在我的流程中启动作业,尽管阅读了这些链接:

Spring Batch Integration config using Java DSL

一些代码片段:

@Bean
public SessionFactory SftpSessionFactory()
    {
        DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
        sftpSessionFactory.setHost("myip");
        sftpSessionFactory.setPort(22);
        sftpSessionFactory.setUser("user");
        sftpSessionFactory.setPrivateKey(new FileSystemResource("path to my key"));

    return sftpSessionFactory;
}

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
        .from(Sftp.inboundAdapter(SftpSessionFactory())
            .deleteRemoteFiles(Boolean.FALSE)
            .preserveTimestamp(Boolean.TRUE)
            .autoCreateLocalDirectory(Boolean.TRUE)
            .remoteDirectory("remote dir")
            .regexFilter(".*\.txt$")
            .localDirectory(new File("C:/sftp/")),
                e -> e.id("sftpInboundAdapter").poller(Pollers.fixedRate(600000)))
        .handle("FileMessageToJobRequest","toRequest")
        // what to put next to process the jobRequest ?

对于 .handle("FileMessageToJobRequest","toRequest") 我使用此处描述的 http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html

非常感谢任何帮助,非常感谢。

EDIT 在 Gary 评论之后 我已经添加了,它不会编译 - 当然 - 因为我不明白请求是如何传播的:

.handle("FileMessageToJobRequest","toRequest")
.handle(jobLaunchingGw())
.get();
}

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher());
}

@Autowired
private JobLauncher jobLauncher;

@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
    return execution;
}

我找到了一种使用@ServiceActivator 启动作业并将其添加到我的流程中的方法,但我不确定这是一个好习惯:

 .handle("lauchBatchService", "launch")

@Component("lauchBatchService")
public class LaunchBatchService {
    private static Logger log = LoggerFactory.getLogger(LaunchBatchService.class);

@Autowired
private JobLauncher jobLauncher;

@ServiceActivator
public JobExecution launch(JobLaunchRequest req) throws JobExecutionException {

    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());

    return execution;
}

}
    .handle(jobLaunchingGw())
    // handle result

...

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher());
}

其中 jobLauncher()JobLauncher bean。

编辑

您的服务激活器与 JLG 的功能大致相同;它使用 this code.

您的 jobLauncher @Bean 有误。

@Beans是定义;他们不做这样的运行时事情

@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
    return execution;
}

因为你已经在自动装配一个 JobLauncher,就用它吧。

@Autowired
private JobLauncher jobLauncher;

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher);   
}