我如何使用 Spring Batch with OSGI
How can I use Spring Batch with OSGI
我想使用 Spring 批处理和 osgi 每天 运行 一个作业。
这是我所做的:
@Component
@EnableBatchProcessing
public class BatchConfiguration {
private JobBuilderFactory jobs;
public JobBuilderFactory getJobs() {
return jobs;
}
public void setJobs(JobBuilderFactory jobs) {
this.jobs = jobs;
}
private StepBuilderFactory steps;
private EmployeeRepository employeeRepository; //spring data repository
public EmployeeRepository getEmployeeRepository() {
return employeeRepository;
}
@Reference
public void setEmployeeRepository(EmployeeRepository employeeRepository) {
employeeRepository= employeeRepository;
}
public Step syncEmployeesStep() throws Exception{
RepositoryItemWriter writer = new RepositoryItemWriter();
writer.setRepository(employeeRepository);
writer.setMethodName("save");
return steps.get("syncEmployeesStep")
.<Employee, Employee> chunk(10)
.reader(reader())
.writer(writer)
.build();
}
public Job importEmpJob()throws Exception {
return jobs.get("importEmpJob")
.incrementer(new RunIdIncrementer())
.start(syncEmployeesStep())
.next(syncEmployeesStep())
.build();
}
public ItemReader<Employee> reader() throws Exception {
String jpqlQuery = "select a from Employee a";
ServerEMF entityManager = new ServerEMF();
JpaPagingItemReader<Employee> reader = new JpaPagingItemReader<Tariff>();
reader.setQueryString(jpqlQuery);
reader.setEntityManagerFactory(entityManager.getEntityManagerFactory());
reader.setPageSize(3);
reader.afterPropertiesSet();
reader.setSaveState(true);
return reader;
}
}
这里我想运行这个作业在两个数据库之间同步,我的问题是如何运行这个作业在osgi里面。
@EnableScheduling
@Component
public class JobRunner {
private JobLauncher jobLauncher;
private Job job ;
private BatchConfiguration batchConfig;
//private JobBuilderFactory jobs;
//private JobRepository jobrepo;
final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
BundleContext ctx;
@SuppressWarnings("rawtypes")
ServiceTracker servicetracker;
@Activate
public void start(BundleContext context) {
batchConfig = new BatchConfiguration();
//jobs = new JobBuilderFactory(jobRepository)
try {
job = batchConfig.importEmpJob(); //job is null because i don't know how to use it
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ctx = context;
servicetracker= new ServiceTracker(ctx, BatchConfiguration.class, null);
servicetracker.open();
new Thread() {
public void run() { findAndRunJob(); }
}.start();
}
@Deactivate
public void stop() {
configAdminTracker.close();
}
@Scheduled(fixedRate = 5000)
protected void findAndRunJob() {
logger.info("job created.");
try {
String dateParam = new Date().toString();
JobParameters param = new JobParametersBuilder().addString("date", dateParam).toJobParameters();
System.out.println(dateParam);
JobExecution execution = jobLauncher.run(job, param);
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
//e.printStackTrace();
}
}
当然,我得到了一个 java.lang.NullPointerException 因为作业是空的。
有人可以帮我吗?
更新后
@Component
@EnableBatchProcessing
public class BatchConfiguration {
private EmployeeRepository employeeRepository; //spring data repository
public EmployeeRepository getEmployeeRepository() {
return employeeRepository;
}
@Reference
public void setEmployeeRepository(EmployeeRepository employeeRepository) {
employeeRepository= employeeRepository;
}
public Step syncEmployeesStep() throws Exception{
RepositoryItemWriter writer = new RepositoryItemWriter();
writer.setRepository(employeeRepository);
writer.setMethodName("save");
return steps.get("syncEmployeesStep")
.<Employee, Employee> chunk(10)
.reader(reader())
.writer(writer)
.build();
}
public Job importEmpJob(JobRepository jobRepository, PlatformTransactionManager transactionManager)throws Exception {
JobBuilderFactory jobs= new JobBuilderFactory(jobRepository);
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
return jobs.get("importEmpJob")
.incrementer(new RunIdIncrementer())
.start(syncEmployeesStep())
.next(syncEmployeesStep())
.build();
}
public ItemReader<Employee> reader() throws Exception {
String jpqlQuery = "select a from Employee a";
ServerEMF entityManager = new ServerEMF();
JpaPagingItemReader<Employee> reader = new JpaPagingItemReader<Tariff>();
reader.setQueryString(jpqlQuery);
reader.setEntityManagerFactory(entityManager.getEntityManagerFactory());
reader.setPageSize(3);
reader.afterPropertiesSet();
reader.setSaveState(true);
return reader;
}
}
工作运行工作class
private JobLauncher jobLauncher;
private PlatformTransactionManager transactionManager;
private JobRepository jobRepository;
Job importEmpJob;
private BatchConfiguration batchConfig;
@SuppressWarnings("deprecation")
@Activate
public void start(BundleContext context) {
try {
batchConfig = new BatchConfiguration();
this.transactionManager = new ResourcelessTransactionManager();
MapJobRepositoryFactoryBean repositorybean = new MapJobRepositoryFactoryBean();
repositorybean.setTransactionManager(transactionManager);
this.jobRepository = repositorybean.getJobRepository(); //error after executing this statement
// setup job launcher
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
simpleJobLauncher.setJobRepository(jobRepository);
this.jobLauncher = simpleJobLauncher;
//System.out.println(job);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ctx = context;
configAdminTracker = new ServiceTracker(ctx, BatchConfiguration.class.getName(), null);
configAdminTracker.open();
new Thread() {
public void run() { findAndRunJob(); }
}.start();
}
@Deactivate
public void stop() {
configAdminTracker.close();
}
protected void findAndRunJob() {
logger.info("job created.");
try {
String dateParam = new Date().toString();
// creating the job
Job job = batchConfig.importEmpJob(jobRepository, transactionManager);
// running the job
JobExecution execution = this.jobLauncher.run(job, new JobParameters());
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
//e.printStackTrace();
}
}
我在 运行ning 之后得到的是 "java.lang.IllegalArgumentException: interface org.springframework.batch.core.repository.JobRepository is not visible from class loader"。谁能帮我解决这个错误?
简而言之
如果您只是想开始一些简单的事情并且不需要所有 Spring 批处理功能,我会研究 Apache Sling Commons Scheduler,它在 Quartz 之上有一个简单的作业处理器用于安排[1].
一般
这里有几个注意事项,具体取决于您要执行的操作。您是否将 Spring Batch Jars 部署到 OSGi 容器,并假设为作业(步骤、任务等)编写的代码将存在于单独的包中? OSGi 的目的是开发模块化代码,所以我的回答是假设这是您的最终目标。 Pivotal 的工作人员已不再要求在他们的工件上支持 OSGi,因此要使其正常工作,您需要确定需要从 Batch jar 文件中导出哪些内容。这可以用 BND 来完成。我建议检查新的 BND Maven 插件 [2]。我将配置 Export-Package 以导出编写作业所需的接口,以便您可以在单独的模块化包中编写作业。然后我可能会将 spring 批处理 jar 嵌入一个包中,并在 JobLauncher 周围编写一个小包装器。这应该包含单个类加载器的所有实际批处理代码,这样您就不必担心 OSGi 会尝试动态引入 类。缺点是这会阻止您在您创建的 spring 批处理包之外使用许多批处理注释,但会提供您通过使用 OSGi 实现此类解决方案而寻求的模块化。
[1] https://sling.apache.org/documentation/bundles/apache-sling-eventing-and-job-handling.html
[2] http://njbartlett.name/2015/03/27/announcing-bnd-maven-plugin.html
我想使用 Spring 批处理和 osgi 每天 运行 一个作业。 这是我所做的:
@Component
@EnableBatchProcessing
public class BatchConfiguration {
private JobBuilderFactory jobs;
public JobBuilderFactory getJobs() {
return jobs;
}
public void setJobs(JobBuilderFactory jobs) {
this.jobs = jobs;
}
private StepBuilderFactory steps;
private EmployeeRepository employeeRepository; //spring data repository
public EmployeeRepository getEmployeeRepository() {
return employeeRepository;
}
@Reference
public void setEmployeeRepository(EmployeeRepository employeeRepository) {
employeeRepository= employeeRepository;
}
public Step syncEmployeesStep() throws Exception{
RepositoryItemWriter writer = new RepositoryItemWriter();
writer.setRepository(employeeRepository);
writer.setMethodName("save");
return steps.get("syncEmployeesStep")
.<Employee, Employee> chunk(10)
.reader(reader())
.writer(writer)
.build();
}
public Job importEmpJob()throws Exception {
return jobs.get("importEmpJob")
.incrementer(new RunIdIncrementer())
.start(syncEmployeesStep())
.next(syncEmployeesStep())
.build();
}
public ItemReader<Employee> reader() throws Exception {
String jpqlQuery = "select a from Employee a";
ServerEMF entityManager = new ServerEMF();
JpaPagingItemReader<Employee> reader = new JpaPagingItemReader<Tariff>();
reader.setQueryString(jpqlQuery);
reader.setEntityManagerFactory(entityManager.getEntityManagerFactory());
reader.setPageSize(3);
reader.afterPropertiesSet();
reader.setSaveState(true);
return reader;
}
}
这里我想运行这个作业在两个数据库之间同步,我的问题是如何运行这个作业在osgi里面。
@EnableScheduling
@Component
public class JobRunner {
private JobLauncher jobLauncher;
private Job job ;
private BatchConfiguration batchConfig;
//private JobBuilderFactory jobs;
//private JobRepository jobrepo;
final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);
BundleContext ctx;
@SuppressWarnings("rawtypes")
ServiceTracker servicetracker;
@Activate
public void start(BundleContext context) {
batchConfig = new BatchConfiguration();
//jobs = new JobBuilderFactory(jobRepository)
try {
job = batchConfig.importEmpJob(); //job is null because i don't know how to use it
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ctx = context;
servicetracker= new ServiceTracker(ctx, BatchConfiguration.class, null);
servicetracker.open();
new Thread() {
public void run() { findAndRunJob(); }
}.start();
}
@Deactivate
public void stop() {
configAdminTracker.close();
}
@Scheduled(fixedRate = 5000)
protected void findAndRunJob() {
logger.info("job created.");
try {
String dateParam = new Date().toString();
JobParameters param = new JobParametersBuilder().addString("date", dateParam).toJobParameters();
System.out.println(dateParam);
JobExecution execution = jobLauncher.run(job, param);
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
//e.printStackTrace();
}
}
当然,我得到了一个 java.lang.NullPointerException 因为作业是空的。 有人可以帮我吗?
更新后
@Component
@EnableBatchProcessing
public class BatchConfiguration {
private EmployeeRepository employeeRepository; //spring data repository
public EmployeeRepository getEmployeeRepository() {
return employeeRepository;
}
@Reference
public void setEmployeeRepository(EmployeeRepository employeeRepository) {
employeeRepository= employeeRepository;
}
public Step syncEmployeesStep() throws Exception{
RepositoryItemWriter writer = new RepositoryItemWriter();
writer.setRepository(employeeRepository);
writer.setMethodName("save");
return steps.get("syncEmployeesStep")
.<Employee, Employee> chunk(10)
.reader(reader())
.writer(writer)
.build();
}
public Job importEmpJob(JobRepository jobRepository, PlatformTransactionManager transactionManager)throws Exception {
JobBuilderFactory jobs= new JobBuilderFactory(jobRepository);
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
return jobs.get("importEmpJob")
.incrementer(new RunIdIncrementer())
.start(syncEmployeesStep())
.next(syncEmployeesStep())
.build();
}
public ItemReader<Employee> reader() throws Exception {
String jpqlQuery = "select a from Employee a";
ServerEMF entityManager = new ServerEMF();
JpaPagingItemReader<Employee> reader = new JpaPagingItemReader<Tariff>();
reader.setQueryString(jpqlQuery);
reader.setEntityManagerFactory(entityManager.getEntityManagerFactory());
reader.setPageSize(3);
reader.afterPropertiesSet();
reader.setSaveState(true);
return reader;
}
}
工作运行工作class
private JobLauncher jobLauncher;
private PlatformTransactionManager transactionManager;
private JobRepository jobRepository;
Job importEmpJob;
private BatchConfiguration batchConfig;
@SuppressWarnings("deprecation")
@Activate
public void start(BundleContext context) {
try {
batchConfig = new BatchConfiguration();
this.transactionManager = new ResourcelessTransactionManager();
MapJobRepositoryFactoryBean repositorybean = new MapJobRepositoryFactoryBean();
repositorybean.setTransactionManager(transactionManager);
this.jobRepository = repositorybean.getJobRepository(); //error after executing this statement
// setup job launcher
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
simpleJobLauncher.setJobRepository(jobRepository);
this.jobLauncher = simpleJobLauncher;
//System.out.println(job);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ctx = context;
configAdminTracker = new ServiceTracker(ctx, BatchConfiguration.class.getName(), null);
configAdminTracker.open();
new Thread() {
public void run() { findAndRunJob(); }
}.start();
}
@Deactivate
public void stop() {
configAdminTracker.close();
}
protected void findAndRunJob() {
logger.info("job created.");
try {
String dateParam = new Date().toString();
// creating the job
Job job = batchConfig.importEmpJob(jobRepository, transactionManager);
// running the job
JobExecution execution = this.jobLauncher.run(job, new JobParameters());
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
//e.printStackTrace();
}
}
我在 运行ning 之后得到的是 "java.lang.IllegalArgumentException: interface org.springframework.batch.core.repository.JobRepository is not visible from class loader"。谁能帮我解决这个错误?
简而言之
如果您只是想开始一些简单的事情并且不需要所有 Spring 批处理功能,我会研究 Apache Sling Commons Scheduler,它在 Quartz 之上有一个简单的作业处理器用于安排[1].
一般
这里有几个注意事项,具体取决于您要执行的操作。您是否将 Spring Batch Jars 部署到 OSGi 容器,并假设为作业(步骤、任务等)编写的代码将存在于单独的包中? OSGi 的目的是开发模块化代码,所以我的回答是假设这是您的最终目标。 Pivotal 的工作人员已不再要求在他们的工件上支持 OSGi,因此要使其正常工作,您需要确定需要从 Batch jar 文件中导出哪些内容。这可以用 BND 来完成。我建议检查新的 BND Maven 插件 [2]。我将配置 Export-Package 以导出编写作业所需的接口,以便您可以在单独的模块化包中编写作业。然后我可能会将 spring 批处理 jar 嵌入一个包中,并在 JobLauncher 周围编写一个小包装器。这应该包含单个类加载器的所有实际批处理代码,这样您就不必担心 OSGi 会尝试动态引入 类。缺点是这会阻止您在您创建的 spring 批处理包之外使用许多批处理注释,但会提供您通过使用 OSGi 实现此类解决方案而寻求的模块化。
[1] https://sling.apache.org/documentation/bundles/apache-sling-eventing-and-job-handling.html
[2] http://njbartlett.name/2015/03/27/announcing-bnd-maven-plugin.html