Spring 使用 Java DSL 的批量集成/启动作业
Spring Batch Integration using Java DSL / launching jobs
我有一个工作的 spring boot/batch projet 包含 2 个工作。
我现在正在尝试仅使用 java 配置/java DSL 添加集成以从远程 SFTP 轮询文件,然后启动作业。
文件轮询工作正常,但我不知道如何在我的流程中启动作业,尽管阅读了这些链接:
Spring Batch Integration config using Java DSL
和
一些代码片段:
@Bean
public SessionFactory SftpSessionFactory()
{
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost("myip");
sftpSessionFactory.setPort(22);
sftpSessionFactory.setUser("user");
sftpSessionFactory.setPrivateKey(new FileSystemResource("path to my key"));
return sftpSessionFactory;
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.deleteRemoteFiles(Boolean.FALSE)
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory("remote dir")
.regexFilter(".*\.txt$")
.localDirectory(new File("C:/sftp/")),
e -> e.id("sftpInboundAdapter").poller(Pollers.fixedRate(600000)))
.handle("FileMessageToJobRequest","toRequest")
// what to put next to process the jobRequest ?
对于 .handle("FileMessageToJobRequest","toRequest") 我使用此处描述的 http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html
非常感谢任何帮助,非常感谢。
EDIT 在 Gary 评论之后
我已经添加了,它不会编译 - 当然 - 因为我不明白请求是如何传播的:
.handle("FileMessageToJobRequest","toRequest")
.handle(jobLaunchingGw())
.get();
}
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher());
}
@Autowired
private JobLauncher jobLauncher;
@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
我找到了一种使用@ServiceActivator 启动作业并将其添加到我的流程中的方法,但我不确定这是一个好习惯:
.handle("lauchBatchService", "launch")
@Component("lauchBatchService")
public class LaunchBatchService {
private static Logger log = LoggerFactory.getLogger(LaunchBatchService.class);
@Autowired
private JobLauncher jobLauncher;
@ServiceActivator
public JobExecution launch(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
}
.handle(jobLaunchingGw())
// handle result
...
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher());
}
其中 jobLauncher()
是 JobLauncher
bean。
编辑
您的服务激活器与 JLG 的功能大致相同;它使用 this code.
您的 jobLauncher @Bean
有误。
@Bean
s是定义;他们不做这样的运行时事情
@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
因为你已经在自动装配一个 JobLauncher
,就用它吧。
@Autowired
private JobLauncher jobLauncher;
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher);
}
我有一个工作的 spring boot/batch projet 包含 2 个工作。
我现在正在尝试仅使用 java 配置/java DSL 添加集成以从远程 SFTP 轮询文件,然后启动作业。
文件轮询工作正常,但我不知道如何在我的流程中启动作业,尽管阅读了这些链接:
Spring Batch Integration config using Java DSL
和
一些代码片段:
@Bean
public SessionFactory SftpSessionFactory()
{
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost("myip");
sftpSessionFactory.setPort(22);
sftpSessionFactory.setUser("user");
sftpSessionFactory.setPrivateKey(new FileSystemResource("path to my key"));
return sftpSessionFactory;
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.deleteRemoteFiles(Boolean.FALSE)
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory("remote dir")
.regexFilter(".*\.txt$")
.localDirectory(new File("C:/sftp/")),
e -> e.id("sftpInboundAdapter").poller(Pollers.fixedRate(600000)))
.handle("FileMessageToJobRequest","toRequest")
// what to put next to process the jobRequest ?
对于 .handle("FileMessageToJobRequest","toRequest") 我使用此处描述的 http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html
非常感谢任何帮助,非常感谢。
EDIT 在 Gary 评论之后 我已经添加了,它不会编译 - 当然 - 因为我不明白请求是如何传播的:
.handle("FileMessageToJobRequest","toRequest")
.handle(jobLaunchingGw())
.get();
}
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher());
}
@Autowired
private JobLauncher jobLauncher;
@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
我找到了一种使用@ServiceActivator 启动作业并将其添加到我的流程中的方法,但我不确定这是一个好习惯:
.handle("lauchBatchService", "launch")
@Component("lauchBatchService")
public class LaunchBatchService {
private static Logger log = LoggerFactory.getLogger(LaunchBatchService.class);
@Autowired
private JobLauncher jobLauncher;
@ServiceActivator
public JobExecution launch(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
}
.handle(jobLaunchingGw())
// handle result
...
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher());
}
其中 jobLauncher()
是 JobLauncher
bean。
编辑
您的服务激活器与 JLG 的功能大致相同;它使用 this code.
您的 jobLauncher @Bean
有误。
@Bean
s是定义;他们不做这样的运行时事情
@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
因为你已经在自动装配一个 JobLauncher
,就用它吧。
@Autowired
private JobLauncher jobLauncher;
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher);
}