Spring 内存中的批处理 (MapJobRepositoryFactoryBean) 清除旧作业而不是 运行 作业
Spring Batch In-memory (MapJobRepositoryFactoryBean) clear old jobs not the running Job
我正在使用 spring-batch 来安排批处理作业,即内存中作为项目特定要求(即不在生产中,它仅用于测试环境),下面是我的配置 类看起来像
// Batch Scheulder class
package org.learning.scheduler
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.SimpleJobExplorer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Job Inmemory Config
*
*/
@EnableScheduling
@Configuration
public class InmemoryJobConfig {
@Bean
public ResourcelessTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean(ResourcelessTransactionManager resourcelessTransactionManager) throws Exception {
MapJobRepositoryFactoryBean factoryBean = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
factoryBean.afterPropertiesSet();
return factoryBean;
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean factoryBean) throws Exception{
return (JobRepository) factoryBean.getObject();
}
@Bean
public JobExplorer jobExplorer(MapJobRepositoryFactoryBean repositoryFactory) {
return new SimpleJobExplorer(repositoryFactory.getJobInstanceDao(), repositoryFactory.getJobExecutionDao(),
repositoryFactory.getStepExecutionDao(), repositoryFactory.getExecutionContextDao());
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
return simpleJobLauncher;
}
}
//Job ConfiguratinClass
/**
* Batch Entry Point for Scheduler for all Jobs
*
*
*/
@Import({InmemoryJobConfig.class})
@EnableBatchProcessing
@Configuration
@Slf4j
public class BatchScheduler {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private SimpleJobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean;
@Bean
public ItemReader<UserDTO> userReader() {
return new UserReader();
}
@Bean
public ItemWriter<User> userWriter() {
return new UserWriter();
}
@Bean
public ItemReader<OrderDTO> orderReader() {
return new OrderReader();
}
@Bean
public ItemWriter<Order> orderWriter() {
return new OrderWriter();
}
@Bean
public Step userStep(ItemReader<UserDTO> reader, ItemWriter<User> writer) {
return steps.get("userStep")
.<UserDTO, User>chunk(20)
.reader(userReader())
.processor(new UserProcessor())
.writer(userWriter())
.build();
}
@Bean
public Step orderStep(ItemReader<OrderDTO> reader, ItemWriter<Order> writer) {
return steps.get("orderStep")
.<OrderDTO, Order>chunk(20)
.reader(orderReader())
.processor(new OrderProcessor())
.writer(orderWriter())
.build();
}
@Bean
public Job userJob() {
return jobs.get("userJob").incrementer(new RunIdIncrementer()).start(userStep(userReader(), userWriter())).build();
}
@Bean
public Job orderJob() {
return jobs.get("orderJob").incrementer(new RunIdIncrementer()).start(orderStep(orderReader(), orderWriter())).build();
}
@Scheduled(cron = "0 0/15 * * * ?")
public void scheduleUserJob() throws JobExecutionException {
Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("userJob");
if (!runningJob.isEmpty()) {
throw new JobExecutionException(" User Job is already in Start State ");
}
JobParameters userParam =
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(userJob(), userParam);
}
@Scheduled(cron = "0 0/15 * * * ?")
public void scheduleOrderJob() throws JobExecutionException {
Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("orderJob");
if (!runningJob.isEmpty()) {
throw new JobExecutionException(" Order Job is already in Start State ");
}
JobParameters orderParam =
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(orderJob(), orderParam);
}
@Scheduled(cron = "0 0/30 * * * ?")
public void scheduleCleanupMemoryJob() throws BatchException {
Set<JobExecution> orderRunningJob = jobExplorer.findRunningJobExecutions("orderJob");
Set<JobExecution> userRunningJob = jobExplorer.findRunningJobExecutions("userJob");
if (!orderRunningJob.isEmpty() || !userRunningJob.isEmpty()) {
throw new BatchException(" Order/user Job is running state , cleanup job is aborted ");
}
mapJobRepositoryFactoryBean.clear();
}
}
我每 0/15 分钟安排了两个作业,这将执行一些业务逻辑,并且我已经安排了内存中清理作业来清除 "mapJobRepositoryFactoryBean" bean 中的内存中作业数据(如果有的话)这两个 Job 不是 运行 state .
我想要建议找到如何删除已经执行的旧作业的最佳方法,如果任何作业处于 运行 状态,上述方法将不会删除旧作业的详细信息。
或者是否有来自 spring-batch 的任何 API 来在作业执行后从内存中清除特定作业的详细信息。?即通过 JobId
清除内存
注意 :我想使用 MapJobRepositoryFactoryBean,而不是持久数据库或任何嵌入式数据库(H2)
MapJobRepository
提供了一种 clear()
方法,可以清除基于地图的作业存储库中的所有数据,但我没有看到任何明显的方法来删除特定作业的元数据。
I want to go with MapJobRepositoryFactoryBean only not persistent database or any embedded database(H2)
我真的建议使用基于 JDBC 的作业存储库和内存数据库。这种方法更好,因为它允许您 运行 查询内存中的数据库并删除特定作业的数据。
我正在使用 spring-batch 来安排批处理作业,即内存中作为项目特定要求(即不在生产中,它仅用于测试环境),下面是我的配置 类看起来像
// Batch Scheulder class
package org.learning.scheduler
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.SimpleJobExplorer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Job Inmemory Config
*
*/
@EnableScheduling
@Configuration
public class InmemoryJobConfig {
@Bean
public ResourcelessTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean(ResourcelessTransactionManager resourcelessTransactionManager) throws Exception {
MapJobRepositoryFactoryBean factoryBean = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
factoryBean.afterPropertiesSet();
return factoryBean;
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean factoryBean) throws Exception{
return (JobRepository) factoryBean.getObject();
}
@Bean
public JobExplorer jobExplorer(MapJobRepositoryFactoryBean repositoryFactory) {
return new SimpleJobExplorer(repositoryFactory.getJobInstanceDao(), repositoryFactory.getJobExecutionDao(),
repositoryFactory.getStepExecutionDao(), repositoryFactory.getExecutionContextDao());
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
return simpleJobLauncher;
}
}
//Job ConfiguratinClass
/**
* Batch Entry Point for Scheduler for all Jobs
*
*
*/
@Import({InmemoryJobConfig.class})
@EnableBatchProcessing
@Configuration
@Slf4j
public class BatchScheduler {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private SimpleJobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
private MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean;
@Bean
public ItemReader<UserDTO> userReader() {
return new UserReader();
}
@Bean
public ItemWriter<User> userWriter() {
return new UserWriter();
}
@Bean
public ItemReader<OrderDTO> orderReader() {
return new OrderReader();
}
@Bean
public ItemWriter<Order> orderWriter() {
return new OrderWriter();
}
@Bean
public Step userStep(ItemReader<UserDTO> reader, ItemWriter<User> writer) {
return steps.get("userStep")
.<UserDTO, User>chunk(20)
.reader(userReader())
.processor(new UserProcessor())
.writer(userWriter())
.build();
}
@Bean
public Step orderStep(ItemReader<OrderDTO> reader, ItemWriter<Order> writer) {
return steps.get("orderStep")
.<OrderDTO, Order>chunk(20)
.reader(orderReader())
.processor(new OrderProcessor())
.writer(orderWriter())
.build();
}
@Bean
public Job userJob() {
return jobs.get("userJob").incrementer(new RunIdIncrementer()).start(userStep(userReader(), userWriter())).build();
}
@Bean
public Job orderJob() {
return jobs.get("orderJob").incrementer(new RunIdIncrementer()).start(orderStep(orderReader(), orderWriter())).build();
}
@Scheduled(cron = "0 0/15 * * * ?")
public void scheduleUserJob() throws JobExecutionException {
Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("userJob");
if (!runningJob.isEmpty()) {
throw new JobExecutionException(" User Job is already in Start State ");
}
JobParameters userParam =
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(userJob(), userParam);
}
@Scheduled(cron = "0 0/15 * * * ?")
public void scheduleOrderJob() throws JobExecutionException {
Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("orderJob");
if (!runningJob.isEmpty()) {
throw new JobExecutionException(" Order Job is already in Start State ");
}
JobParameters orderParam =
new JobParametersBuilder().addLong("date", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(orderJob(), orderParam);
}
@Scheduled(cron = "0 0/30 * * * ?")
public void scheduleCleanupMemoryJob() throws BatchException {
Set<JobExecution> orderRunningJob = jobExplorer.findRunningJobExecutions("orderJob");
Set<JobExecution> userRunningJob = jobExplorer.findRunningJobExecutions("userJob");
if (!orderRunningJob.isEmpty() || !userRunningJob.isEmpty()) {
throw new BatchException(" Order/user Job is running state , cleanup job is aborted ");
}
mapJobRepositoryFactoryBean.clear();
}
}
我每 0/15 分钟安排了两个作业,这将执行一些业务逻辑,并且我已经安排了内存中清理作业来清除 "mapJobRepositoryFactoryBean" bean 中的内存中作业数据(如果有的话)这两个 Job 不是 运行 state .
我想要建议找到如何删除已经执行的旧作业的最佳方法,如果任何作业处于 运行 状态,上述方法将不会删除旧作业的详细信息。
或者是否有来自 spring-batch 的任何 API 来在作业执行后从内存中清除特定作业的详细信息。?即通过 JobId
清除内存注意 :我想使用 MapJobRepositoryFactoryBean,而不是持久数据库或任何嵌入式数据库(H2)
MapJobRepository
提供了一种 clear()
方法,可以清除基于地图的作业存储库中的所有数据,但我没有看到任何明显的方法来删除特定作业的元数据。
I want to go with MapJobRepositoryFactoryBean only not persistent database or any embedded database(H2)
我真的建议使用基于 JDBC 的作业存储库和内存数据库。这种方法更好,因为它允许您 运行 查询内存中的数据库并删除特定作业的数据。