Spring 重负载批处理多进程,每个进程下多线程

Spring Batch multiple process for heavy load with multiple thread under every process

我有一个场景,我需要有大约 50-60 个不同的进程 运行 同时执行一个任务。

每个进程都必须使用 sql 查询从数据库中获取数据,方法是传递一个值并在后续任务中获取数据 运行。 select col_1、col_2、col_3 来自 table_1 其中 col_1 = :Process_1;

 @Bean
    public Job partitioningJob() throws Exception {
        return jobBuilderFactory.get("parallelJob")
                .incrementer(new RunIdIncrementer())
                .flow(masterStep())
                .end()
                .build();
    }

    @Bean
    public Step masterStep() throws Exception {
        //How to fetch data from configuration and pass all values in partitioner one by one.
        // Can we give the name for every process so that it is helpful in logs and monitoring.
        return stepBuilderFactory.get("masterStep")
                .partitioner(slaveStep())
                .partitioner("partition", partitioner())
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws Exception {
        //Hit DB with sql query and fetch the data.

    }

    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<Map<String, String>, Map<String, String>>chunk(1)
                .processTask()
                .build();
    }

由于我们在 Apache Camel 中有 Aggregator 和 parallelProcessing,Spring Batch 是否有任何类似的功能可以完成相同的工作?

我是 Spring Batch 的新手,目前正在探索它是否可以处理该卷。 因为这将是一个负载很重的应用程序 运行ning 24*7 并且每个进程都需要 运行 同时进行,其中每个线程都应该能够支持一个进程内的多个线程。

有没有办法监视这些进程,以便它无论如何都会终止,我应该能够重新启动该特定进程? 请帮忙解决这个问题。

请找出以上问题的答案。

  1. parallelProcessing - 本地和远程分区支持并行处理,可以处理大量数据,因为我们目前每天处理 200 到 3 亿条数据。

  2. 是否可以处理大容量 - 是的,这可以处理大容量并且已得到充分证明。

  3. 每个进程都需要 运行 同时进行,其中每个线程都应该能够支持一个进程内的多个线程 - Spring 批处理将根据您的线程池进行处理。确保根据系统资源配置池。

  4. 有没有一种方法可以监视这些进程以使其终止 - 是的。分区的每个并行过程都是一个步骤,您可以在 BATCH_STEP_EXECUTION 中进行监控并了解所有详细信息

  5. 应该能够重新启动该特定进程 - 是的,这是一个内置功能,可以从失败的步骤重新启动。大量作业我们总是使用容错,以便稍后处理拒绝。这也是内置功能。

下面的示例项目

https://github.com/ngecom/springBatchLocalParition/tree/master

已添加数据库 - H2 并创建 table 在资源文件夹中可用。我们总是更喜欢使用数据源池,池大小将大于您的线程池大小。

示例项目总结

  1. 从table“customer”中读取并划分为step partitions
  2. 每步分区写入新的table "new_customer"
  3. JobConfiguration.java 方法名称“taskExecutor()”
  4. 中可用的线程池配置
  5. slaveStep() 中可用的块大小。
  6. 您可以根据并行步骤计算内存大小并配置为 VM 最大内存。

查询执行后根据你上面的问题帮你分析

SELECT * FROM NEW_CUSTOMER;   
SELECT * FROM BATCH_JOB_EXECUTION bje;
SELECT * FROM BATCH_STEP_EXECUTION bse WHERE JOB_EXECUTION_ID=2; 
SELECT * FROM BATCH_STEP_EXECUTION_CONTEXT bsec WHERE STEP_EXECUTION_ID=4; 

如果你想更改为 MYSQL 添加下面作为数据源

spring.datasource.hikari.minimum-idle=5 
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.idle-timeout=600000 
spring.datasource.hikari.max-lifetime=1800000 
spring.datasource.hikari.auto-commit=true 
spring.datasource.hikari.poolName=SpringBoot-HikariCP
spring.datasource.url=jdbc:mysql://localhost:3306/ngecomdev
spring.datasource.username=ngecom
spring.datasource.password=ngbilling

请始终参考下面的 guthub URL。你会从中得到很多灵感。

https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples