使用 spring 批量分区处理大量数据

Processing huge data with spring batch partitioning

我正在实施 spring 批处理作业以处理数据库中的数百万条记录 table 使用分区方法如下 -

  1. 从分区程序中的 table 获取唯一的分区代码,并在执行上下文中设置相同的代码。

  2. 使用 reader、处理器和写入器创建块步骤,以根据特定分区代码处理记录。

这种方法是否合适,或者对于这种情况有更好的方法吗?由于某些分区代码可以比其他分区代码拥有更多的记录,因此记录更多的分区代码可能比记录更少的分区代码需要更多的时间来处理。

是否可以创建 partition/thread 来处理线程 1 进程 1-1000、线程 2 进程 1001-2000 等?

如何控制创建的线程数,因为分区代码可以在 100 左右,我想在 5 次迭代中只创建 20 个线程和进程?

如果一个分区出现故障会发生什么情况,是否会停止所有处理并恢复原状?

配置如下-

 <bean id="MyPartitioner" class="com.MyPartitioner" />
 <bean id="itemProcessor" class="com.MyProcessor" scope="step" />
 <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
  <property name="dataSource" ref="dataSource"/>
  <property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
  <property name="rowMapper">
      <bean class="com.MyRowMapper" scope="step"/>
  </property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    <property name="corePoolSize" value="20"/>
    <property name="maxPoolSize" value="20"/>
    <property name="allowCoreThreadTimeOut" value="true"/>
</bean>

<batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
    </batch:tasklet>
</batch:step>
<batch:job id="myjob">
    <batch:step id="mystep">
        <batch:partition step="Step1" partitioner="MyPartitioner">
            <batch:handler grid-size="20" task-executor="taskExecutor"/>
        </batch:partition>
    </batch:step>
</batch:job>

分区程序-

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
List<String> codes = getCodes();

for (String code : codes)
{
    ExecutionContext context = new ExecutionContext();
    context.put("code", code);
    partitionMap.put(code, context);
}
return partitionMap;}}

谢谢

我会说这是正确的方法,我不明白为什么你需要每 1000 个项目有一个线程,如果你根据唯一的分区代码进行分区并且有 1000 个项目的块你将在每个线程上有 1000 个项目的事务IMO 没问题。

  1. 除了保存唯一的分区代码外,您还可以计算 许多你有每个分区代码和分区甚至更多,通过 为每 1000 个相同的分区代码创建新的子上下文(即 具有即 2200 条记录的分区代码的方式,您将调用 3 具有上下文参数的线程:1=> partition_key=key1, skip=0, count=1000, 2=>partition_key=key1, skip=1000, count=1000 和 3=>partition_key=key1, skip=2000, count=1000) 如果那是你 想要但没有它我还是会去

  2. 线程数由 ThreadPoolTaskExecutor 控制,它在创建时传递给分区步骤。您有方法 setCorePoolSize(),您可以将其设置为 20,并且最多可以获得 20 个线程。下一个细粒度配置是 grid-size,它告诉我们将从完整的分区映射中创建多少个分区。这里是explanation of grid size。所以分区就是分工。之后,您的线程配置将定义实际处理的并发性。

  3. 如果一个分区失败,则整个分区步骤失败,并显示哪个分区失败的信息。成功的分区已完成,不会再次调用,当作业重新启动时,它将通过重做失败和未处理的分区从中断的地方开始。

希望我能回答你的所有问题,因为有很多问题。

案例 1 的示例 - 可能有错误,但只是为了了解一下:

public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
    Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
    Map<String, int> codesWithCounts = getCodesWithCounts();

    for (Entry<String, int> codeWithCount : codesWithCounts.entrySet())
    {
        for (int i = 0; i < codeWithCount.getValue(); i + 1000){
            ExecutionContext context = new ExecutionContext();
            context.put("code", code);
            context.put("skip", i);
            context.put("count", 1000);
            partitionMap.put(code, context);
        }
    }
    return partitionMap;
}

Adn 比你翻页 1000,你从上下文中得到你应该跳过多少,在 2200 的例子中是:0、1000、2000。