使用 Spring Boot & JPA 进行批处理 - 将内存数据源用于与批处理相关的表

Batch with Spring Boot & JPA - use in-memory datasource for batch-related tables

上下文

我正在尝试通过 Spring 引导开发批处理服务,使用 JPA 存储库。使用两个不同的数据源,我希望在内存数据库中创建批处理相关的表,这样它就不会污染我的业务数据库。

根据网络上的多个主题,我想出了我的两个数据源的配置:

@Configuration
public class DataSourceConfiguration {

    @Bean(name = "mainDataSource")
    @Primary
    @ConfigurationProperties(prefix="spring.datasource")
    public DataSource mainDataSource(){
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "batchDataSource")
    public DataSource batchDataSource( @Value("${batch.datasource.url}") String url ){
        return DataSourceBuilder.create().url( url ).build();
    }   
}

第一个 mainDataSource 使用默认的 Spring 数据库配置。 batchDataSource 定义了一个嵌入式 HSQL 数据库,我希望在其中创建批处理表和步骤表。

# DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties)
spring.datasource.url=jdbc:mariadb://localhost:3306/batch_poc
spring.datasource.username=root
spring.datasource.password=
spring.datasource.driver-class-name=org.mariadb.jdbc.Driver
spring.datasource.max-age=10000

spring.datasource.initialize=false

# JPA (JpaBaseConfiguration, HibernateJpaAutoConfiguration)
spring.jpa.generate-ddl=false
spring.jpa.show-sql=true
spring.jpa.database=MYSQL

# SPRING BATCH (BatchDatabaseInitializer)
spring.batch.initializer.enabled=false

# ----------------------------------------
# PROJECT SPECIFIC PROPERTIES
# ----------------------------------------

# BATCH DATASOURCE
batch.datasource.url=jdbc:hsqldb:file:C:/tmp/hsqldb/batchdb

这是我的批处理配置:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    private static final Logger LOG = Logger.getLogger( BatchConfiguration.class );

    @Bean
    public BatchConfigurer configurer(){
        return new CustomBatchConfigurer();
    }

    @Bean
    public Job importElementsJob( JobBuilderFactory jobs, Step step1 ){
        return jobs.get("importElementsJob")
                .incrementer( new RunIdIncrementer() )
                .flow( step1 )
                .end()
                .build();               
    }

    @Bean
    public Step step1( StepBuilderFactory stepBuilderFactory, ItemReader<InputElement> reader,
            ItemWriter<List<Entity>> writer, ItemProcessor<InputElement, List<Entity>> processor ){

        return stepBuilderFactory.get("step1")
                .<InputElement, List<Entity>> chunk(100)
                .reader( reader )
                .processor( processor )
                .writer( writer )
                .build();
    }

    @Bean
    public ItemReader<InputElement> reader() throws IOException {       
        return new CustomItemReader();
    }

    @Bean
    public ItemProcessor<InputElement, List<Entity>> processor(){
        return new CutsomItemProcessor();
    }

    @Bean
    public ItemWriter<List<Entity>> writer(){
        return new CustomItemWriter();
    }

}

BatchConfigurer,使用内存数据库:

public class CustomBatchConfigurer extends DefaultBatchConfigurer {

    @Override
    @Autowired
    public void setDataSource( @Qualifier("batchDataSource") DataSource dataSource) {
        super.setDataSource(dataSource);
    }

}

最后,我的作家:

public class CustomItemWriter implements ItemWriter<List<Entity>> {

    private static final Logger LOG = Logger.getLogger( EntityWriter.class );

    @Autowired
    private EntityRepository entityRepository;

    @Override
    public void write(List<? extends List<Entity>> items)
            throws Exception {
        if( items != null && !items.isEmpty() ){

            for( List<Entity> entities : items ){
                for( Entity entity : entities ){                        
                    Entity fromDb = entityRepository.findById( entity.getId() );

                    // Insert
                    if( fromDb == null ){
                        entityRepository.save( entity );
                    }

                    // Update
                    else {
                        // TODO : entityManager.merge()
                    }
                }
            }

        }
    }

}

EntityRepository 接口扩展了 JpaRepository。

问题

当我以这种方式分离数据源时,调用存储库的保存方法时没有任何反应。我在日志中看到来自 findById() 调用的 select 查询。但没有什么可以挽救的。最后我的输出数据库是空的。

当我回到一个独特的数据源配置(删除配置器 bean 并让 Spring 引导单独管理数据源)时,插入查询工作正常。

也许主数据源配置不够好,JPA 无法正确执行插入。但是缺少什么?

我终于解决了在 Spring class BasicBatchConfigurer 的基础上实现自己的 BatchConfigurer 并强制使用基于 Map 的 jobRepository 和 jobExplorer 的问题。没有更多的自定义数据源配置,只有一个数据源,我让 Spring 引导管理:这样更容易。

我的自定义 BatchConfigurer:

public class CustomBatchConfigurer implements BatchConfigurer {

    private static final Logger LOG = Logger.getLogger( CustomBatchConfigurer.class );

    private final EntityManagerFactory entityManagerFactory;

    private PlatformTransactionManager transactionManager;

    private JobRepository jobRepository;

    private JobLauncher jobLauncher;

    private JobExplorer jobExplorer;

    /**
     * Create a new {@link CustomBatchConfigurer} instance.
     * @param entityManagerFactory the entity manager factory
     */
    public CustomBatchConfigurer( EntityManagerFactory entityManagerFactory ) {
        this.entityManagerFactory = entityManagerFactory;
    }

    @Override
    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    @Override
    public JobLauncher getJobLauncher() {
        return this.jobLauncher;
    }

    @Override
    public JobExplorer getJobExplorer() throws Exception {
        return this.jobExplorer;
    }

    @PostConstruct
    public void initialize() {
        try {
            // transactionManager:
            LOG.info("Forcing the use of a JPA transactionManager");
            if( this.entityManagerFactory == null ){
                throw new Exception("Unable to initialize batch configurer : entityManagerFactory must not be null");
            }
            this.transactionManager = new JpaTransactionManager( this.entityManagerFactory );

            // jobRepository:
            LOG.info("Forcing the use of a Map based JobRepository");
            MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean( this.transactionManager );
            jobRepositoryFactory.afterPropertiesSet();
            this.jobRepository = jobRepositoryFactory.getObject();

            // jobLauncher:
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(getJobRepository());
            jobLauncher.afterPropertiesSet();
            this.jobLauncher = jobLauncher;

            // jobExplorer:
            MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory);
            jobExplorerFactory.afterPropertiesSet();
            this.jobExplorer = jobExplorerFactory.getObject();
        }
        catch (Exception ex) {
            throw new IllegalStateException("Unable to initialize Spring Batch", ex);
        }
    }

}

我的配置 class 现在看起来像这样:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Bean
    public BatchConfigurer configurer( EntityManagerFactory entityManagerFactory ){
        return new CustomBatchConfigurer( entityManagerFactory );
    }

    [...]

}

还有我的属性文件:

# DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties)
spring.datasource.url=jdbc:mariadb://localhost:3306/inotr_poc
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=org.mariadb.jdbc.Driver
spring.datasource.max-age=10000

spring.datasource.initialize=true

# JPA (JpaBaseConfiguration, HibernateJpaAutoConfiguration)
spring.jpa.generate-ddl=false
spring.jpa.show-sql=true
spring.jpa.database=MYSQL

# SPRING BATCH (BatchDatabaseInitializer)
spring.batch.initializer.enabled=false

Eria 的回答有效!但是,我将其修改为使用:

org.springframework.batch.support.transaction.ResourcelessTransactionManager

来自 CustomBatchConfigurer:

    @PostConstruct
    public void initialize() {
        try {
            // transactionManager:
            LOGGER.info("Forcing the use of ResourcelessTransactionManager for batch db");

            this.transactionManager = new ResourcelessTransactionManager(); 
            //the rest of the code follows... 
    }

感谢以上帖子!在过去的几天里,我一直在努力让我的 Spring Boot with Batch 应用程序与基于内存映射的作业存储库和无资源事务管理器一起工作。 (我不能让 Spring Batch 将我的应用程序数据源用于 Batch 元数据表,因为我没有 DDL 访问权限来在那里创建 BATCH_ 表) 看了上面的帖子,终于得出了下面的配置,完美运行!!

public class CustomBatchConfigurer implements BatchConfigurer {

    private static final Logger LOG = LoggerFactory.getLogger(CustomBatchConfigurer.class);

   // private final EntityManagerFactory entityManagerFactory;

    private PlatformTransactionManager transactionManager;

    private JobRepository jobRepository;

    private JobLauncher jobLauncher;

    private JobExplorer jobExplorer;

    /**
     * Create a new {@link CustomBatchConfigurer} instance.
     * @param entityManagerFactory the entity manager factory

    public CustomBatchConfigurer( EntityManagerFactory entityManagerFactory ) {
        this.entityManagerFactory = entityManagerFactory;
    }
*/
    @Override
    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    @Override
    public JobLauncher getJobLauncher() {
        return this.jobLauncher;
    }

    @Override
    public JobExplorer getJobExplorer() throws Exception {
        return this.jobExplorer;
    }

    @PostConstruct
    public void initialize() {
        try {
            // transactionManager:
            LOG.info("Forcing the use of a Resourceless transactionManager");

            this.transactionManager = new ResourcelessTransactionManager();

            // jobRepository:
            LOG.info("Forcing the use of a Map based JobRepository");
            MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean( this.transactionManager );
            jobRepositoryFactory.afterPropertiesSet();
            this.jobRepository = jobRepositoryFactory.getObject();

            // jobLauncher:
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(getJobRepository());
            jobLauncher.afterPropertiesSet();
            this.jobLauncher = jobLauncher;

            // jobExplorer:
            MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory);
            jobExplorerFactory.afterPropertiesSet();
            this.jobExplorer = jobExplorerFactory.getObject();
        }
        catch (Exception ex) {
            throw new IllegalStateException("Unable to initialize Spring Batch", ex);
        }
    }

}

下面是我在作业配置中添加的 bean class

@Bean
public BatchConfigurer configurer(){
    return new CustomBatchConfigurer();
}