Spring 批处理线程安全地图作业存储库
Spring Batch thread-safe Map job repository
Spring Batch docs Map-backed 作业存储库的说法:
Note that the in-memory repository is volatile and so does not allow restart between JVM instances. It also cannot guarantee that two job instances with the same parameters are launched simultaneously, and is not suitable for use in a multi-threaded Job, or a locally partitioned Step. So use the database version of the repository wherever you need those features.
我想使用 Map 作业存储库,我不关心重新启动、防止并发作业执行等,但我 关心能够使用多线程和本地分区。
我的批处理应用程序有一些分区步骤,乍一看似乎 运行 使用 Map-backed 作业存储库就好了。
它说 MapJobRepositoryFactoryBean 不可能的原因是什么?查看 Map DAO 的实现,它们使用的是 ConcurrentHashMap。这不是线程安全的吗?
我建议您遵循文档,而不是依赖实施细节。即使映射单独是线程安全的,更改中也可能存在竞争条件,而不是涉及多个映射。
您可以非常轻松地使用内存数据库。示例
@Grapes([
@Grab('org.springframework:spring-jdbc:4.0.5.RELEASE'),
@Grab('com.h2database:h2:1.3.175'),
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE'),
// must be passed with -cp, for whatever reason the GroovyClassLoader
// is not used for com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver
//@Grab('org.codehaus.jettison:jettison:1.2'),
])
import org.h2.jdbcx.JdbcDataSource
import org.springframework.batch.core.Job
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.io.ResourceLoader
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator
import javax.annotation.PostConstruct
import javax.sql.DataSource
@Configuration
@EnableBatchProcessing
class AppConfig {
@Autowired
private JobBuilderFactory jobs
@Autowired
private StepBuilderFactory steps
@Bean
public Job job() {
return jobs.get("myJob").start(step1()).build()
}
@Bean
Step step1() {
this.steps.get('step1')
.tasklet(new MyTasklet())
.build()
}
@Bean
DataSource dataSource() {
new JdbcDataSource().with {
url = 'jdbc:h2:mem:temp_db;DB_CLOSE_DELAY=-1'
user = 'sa'
password = 'sa'
it
}
}
@Bean
BatchSchemaPopulator batchSchemaPopulator() {
new BatchSchemaPopulator()
}
}
class BatchSchemaPopulator {
@Autowired
ResourceLoader resourceLoader
@Autowired
DataSource dataSource
@PostConstruct
void init() {
def populator = new ResourceDatabasePopulator()
populator.addScript(
resourceLoader.getResource(
'classpath:/org/springframework/batch/core/schema-h2.sql'))
DatabasePopulatorUtils.execute populator, dataSource
}
}
class MyTasklet implements Tasklet {
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
println 'TEST!'
}
}
def ctx = new AnnotationConfigApplicationContext(AppConfig)
def launcher = ctx.getBean(JobLauncher)
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
println "Status is: ${jobExecution.status}"
Spring Batch docs Map-backed 作业存储库的说法:
Note that the in-memory repository is volatile and so does not allow restart between JVM instances. It also cannot guarantee that two job instances with the same parameters are launched simultaneously, and is not suitable for use in a multi-threaded Job, or a locally partitioned Step. So use the database version of the repository wherever you need those features.
我想使用 Map 作业存储库,我不关心重新启动、防止并发作业执行等,但我 关心能够使用多线程和本地分区。
我的批处理应用程序有一些分区步骤,乍一看似乎 运行 使用 Map-backed 作业存储库就好了。
它说 MapJobRepositoryFactoryBean 不可能的原因是什么?查看 Map DAO 的实现,它们使用的是 ConcurrentHashMap。这不是线程安全的吗?
我建议您遵循文档,而不是依赖实施细节。即使映射单独是线程安全的,更改中也可能存在竞争条件,而不是涉及多个映射。
您可以非常轻松地使用内存数据库。示例
@Grapes([
@Grab('org.springframework:spring-jdbc:4.0.5.RELEASE'),
@Grab('com.h2database:h2:1.3.175'),
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE'),
// must be passed with -cp, for whatever reason the GroovyClassLoader
// is not used for com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver
//@Grab('org.codehaus.jettison:jettison:1.2'),
])
import org.h2.jdbcx.JdbcDataSource
import org.springframework.batch.core.Job
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.io.ResourceLoader
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator
import javax.annotation.PostConstruct
import javax.sql.DataSource
@Configuration
@EnableBatchProcessing
class AppConfig {
@Autowired
private JobBuilderFactory jobs
@Autowired
private StepBuilderFactory steps
@Bean
public Job job() {
return jobs.get("myJob").start(step1()).build()
}
@Bean
Step step1() {
this.steps.get('step1')
.tasklet(new MyTasklet())
.build()
}
@Bean
DataSource dataSource() {
new JdbcDataSource().with {
url = 'jdbc:h2:mem:temp_db;DB_CLOSE_DELAY=-1'
user = 'sa'
password = 'sa'
it
}
}
@Bean
BatchSchemaPopulator batchSchemaPopulator() {
new BatchSchemaPopulator()
}
}
class BatchSchemaPopulator {
@Autowired
ResourceLoader resourceLoader
@Autowired
DataSource dataSource
@PostConstruct
void init() {
def populator = new ResourceDatabasePopulator()
populator.addScript(
resourceLoader.getResource(
'classpath:/org/springframework/batch/core/schema-h2.sql'))
DatabasePopulatorUtils.execute populator, dataSource
}
}
class MyTasklet implements Tasklet {
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
println 'TEST!'
}
}
def ctx = new AnnotationConfigApplicationContext(AppConfig)
def launcher = ctx.getBean(JobLauncher)
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
println "Status is: ${jobExecution.status}"