Spring 批处理内存过顶

Spring batch memory through the roof

我正在处理 Spring 批处理中的严重内存问题,这是一个令人担忧的问题。
流程很简单:从 Oracle 读取,转换为另一个 object 类型,然后将其写回另一个 table。它涉及 20 万条记录。
我尝试了 HibernateCursorItemReader 和 RepositoryItemReader.

我的作业执行以下步骤:

@Bean
public Step optInMutationHistoryStep() {
    return stepBuilderFactory
            .get(STEP_NAME)
            .<BodiCmMemberEntity, AbstractMutationHistoryEntity> chunk(5)
            .reader(optInItemReader)
            .processor(optInMutationHistoryItemProcessor)
            .writer(mutationHistoryItemWriter)
            .faultTolerant()
            .skipPolicy(itemSkipPolicy)
            .skip(Exception.class)
            .listener((StepExecutionListener) optInCountListener)
            .build();

}

Reader:

@Component
public class OptInItemReaderTest extends RepositoryItemReader<BodiCmMemberEntity> {

    public OptInItemReaderTest(BodiCmMemberRepository bodiCmMemberRepository){
        setRepository(bodiCmMemberRepository);
        setMethodName("findAllOptIns");
        setPageSize(100);
        Map<String, Sort.Direction> sort = new HashMap<>();
        sort.put("member_number", Sort.Direction.ASC);
        setSort(new HashMap<>(sort));
    }

}

处理器:

@Component
@StepScope
public class OptInMutationHistoryItemProcessor implements ItemProcessor<CmMemberEntity, AbstractMutationHistoryEntity> {

    Long jobId;

    @BeforeStep
    public void beforeStep(StepExecution stepExecution){
        jobId = stepExecution.getJobExecutionId();
    }

    private final MutationHistoryBatchFactory mutationHistoryFactory;

    public OptInMutationHistoryItemProcessor(MutationHistoryBatchFactory mutationHistoryFactory) {
        this.mutationHistoryFactory = mutationHistoryFactory;
    }

    @Override
    public AbstractMutationHistoryEntity process(CmMemberEntity cmMemberEntity){
        return mutationHistoryFactory.addMutationHistoryEntity(cmMemberEntity, jobId, OPT_IN);
    }
}

项目作者:

@Component
public class MutationHistoryItemWriter extends RepositoryItemWriter<AbstractMutationHistoryEntity>{

    public MutationHistoryItemWriter(MutationHistoryRepository mutationHistoryRepository) {
        setRepository(mutationHistoryRepository);
    }
}

我在处理器中使用的工厂方法:

public AbstractMutationHistoryEntity addMutationHistoryEntity(CmMemberEntity cmMemberEntity, Long jobId, JobType jobType) {
    return mutationHistoryEntityBuilder(cmMemberEntity, jobId, jobType, MutationType.ADD)
            .editionCode(cmMemberEntity.getPoleEditionCodeEntity().getEditionCode())
            .firstName(cmMemberEntity.getFirstName())
            .lastName(cmMemberEntity.getLastName())
            .streetName(cmMemberEntity.getStreetName())
            .houseNumber(cmMemberEntity.getHouseNumber())
            .box(cmMemberEntity.getBox())
            .postalCode(cmMemberEntity.getPostalCode())
            .numberPieces(DEFAULT_NUMBER_PIECES)
            .build();
}

我没有看到内存中保存的直接引用,不确定是什么原因导致内存快速增加。翻倍很快。

直觉上我觉得问题要么是检索到的结果集没有定期刷新,要么是处理器以某种方式泄漏,但不确定为什么,因为我没有保存对我创建的 objects 的引用在处理器中。

有什么建议吗?

编辑:

我的完整工作如下:

@Bean
public Job optInJob(
        OptInJobCompletionNotificationListener listener,
        @Qualifier("optInSumoStep") Step optInSumoStep,
        @Qualifier("optInMutationHistoryStep") Step optInMutationHistoryStep,
        @Qualifier("optInMirrorStep") Step optInMirrorStep) {
    return jobBuilderFactory.get(OPT_IN_JOB)
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(optInSumoStep)
            .next(optInMutationHistoryStep)
            .next(optInMirrorStep)
            .end()
            .build();
}

在第一步中,使用相同的 itemreader 将记录写入 XML。第二步,执行最初共享的步骤。
在 OpenShift 中,很明显没有任何东西被清理,而且我没有任何引用 afaik;我不知道为什么我会:

很明显,消耗确实在一段时间后趋于平缓,但 ik 一直在上升,它永远不会下降。在第一步(大约 680Mb)之后,我本以为它会有。更重要的是,我希望曲线在第一步中也能变平;将块大小增加到 100 后,每处理 100 个块就会释放内存。

HibernateCursorItemReader 非常糟糕;在第一步中它已经上升到 700Mb。 repositoryItemWriter 似乎表现更好。也许是有原因的,但我不清楚是哪一个。

我现在不能说工作结束后是否会清理任何东西;因为它处理 200k 条记录,需要一些时间,我认为它会在完成之前再次 运行 内存不足。

我担心如果我们不设法解决这个问题,我们将无法投入生产。

编辑 2:

有趣的是,在第 2 步中,曲线已经变平到内存消耗没有增加的程度。这很有趣;我目前无法说出为什么是 'now'.

目前的希望/期望是第 3 步不会增加内存消耗,并且在作业完成后清理内存。 看来插入速度已经大大减慢了。我估计是 3 倍(第二步大约有 110k 条记录)。

编辑 3: 在 itemwriter 上应用 flush 对减少内存消耗或速度没有任何作用:

其实这次是运行直奔我的极限,但是我无法解释:

你可以清楚地看到这个过程是如何变慢的,只是读取、转换、写入记录。

我目前不知道为什么,但对于批处理应用程序,我不认为这是我们将其移至生产环境的可接受行为table。两批 运行 之后,它就会变得陈旧。

感谢@Mahmoud Ben Hassine 和@M,问题得到解决(或至少达到可接受的水平)。 Deinum.
itemWriters 如下所示:

@Component
public class MutationHistoryItemWriter extends RepositoryItemWriter<AbstractMutationHistoryEntity>{

    @PersistenceContext
    private EntityManager entityManager;

    private final MutationHistoryRepository mutationHistoryRepository;

    public MutationHistoryItemWriter(MutationHistoryRepository mutationHistoryRepository) {
        this.mutationHistoryRepository = mutationHistoryRepository;
        setRepository(mutationHistoryRepository);
    }

    @Override
    public void write(List<? extends AbstractMutationHistoryEntity> items) throws Exception {
        super.write(items);
        mutationHistoryRepository.flush();
        entityManager.clear();
    }

}

public class MirrorItemWriter  extends RepositoryItemWriter<SumoCmMemberEntity> {

    @PersistenceContext
    private EntityManager entityManager;

    private final SumoCmMemberRepository sumoCmMemberRepository;

    public MirrorItemWriter(SumoCmMemberRepository sumoCmMemberRepository) {
        this.sumoCmMemberRepository = sumoCmMemberRepository;
        setRepository(sumoCmMemberRepository);
    }

    @Override
    public void write(List<? extends SumoCmMemberEntity> items) throws Exception {
        super.write(items);
        sumoCmMemberRepository.flush();
        entityManager.clear();
    }

}

有趣的是:

  • 在刷新编写器并清除 entityManagers 后,写入 xml 也突然大幅加速(第 1 步),这对我来说没有意义,但它有效(我不使用异步处理或多线程)。
  • HibernateCursorItemReader 现在也执行得更正常了。我不确定在我的初始测试期间触发的大量内存消耗行为是什么(HibernateCursorItemReader 与 RepositoryItemReader 之间的区别),但在这一点上并不重要。

我执行了两次测试,中间有一次休息, 处理 3 步 * 200k 条记录(总共 600k),这 2 次。

它基本上从 4 小时 + 堆溢出到第一个只有几分钟 运行:

在第二个 运行 上,我看到再次发生了一些退化,但考虑到之前的情况,我仍然认为这是快速的:

出于某种原因,内存只是不想让它中断。 我开始把它当作个人 (LoL)。
处理器短暂出现峰值,但没有出现异常。

这对我来说不是真正的问题,因为 200k 记录只是一个负载测试。初始状态将通过 sql 脚本设置,因此这不是真正的问题。随之而来的突变最多每周会涉及几千个。
我们还将进行 Influx / Grafana 监控,因此我们将能够监控 jvm + 具有警报。
不过还是有点困扰我,所以我会在今天剩下的时间里继续做一些测试。

编辑: 将所有 HibernateCursorItemReaders 替换为 RepositoryItemReaders 后,内存消耗看起来不错。
下图表示 2 个批次 运行s,每个批次总共 600k 条记录:

处理一个奇怪的问题,我的记录只有一半存储在数据库中,但这可能是因为一个不相关的错误。

编辑: 因为 RepositoryItemReader 是分页的,所以只存储了一半的记录。我同时修复了 HibernateItemReader 的内存错误。

@Component
public class OptInItemReader extends HibernateCursorItemReader<BodiCmMemberEntity> {

    SessionFactory sessionFactory;

    private static final String ITEM_READER_NAME = "OPT_IN_ITEM_READER";
    private static final String QUERY_FIND_OPT_INS =

            """
            Your query 
            """;

    public OptInItemReader(SessionFactory sessionFactory) throws Exception {
        this.sessionFactory = sessionFactory;
        setName(ITEM_READER_NAME);
        setSessionFactory(sessionFactory);
        setQueryProvider(provider(QUERY_FIND_OPT_INS, sessionFactory));
    }

    private HibernateNativeQueryProvider<BodiCmMemberEntity> provider(String query, SessionFactory sessionFactory) {
        HibernateNativeQueryProvider<BodiCmMemberEntity> provider = new HibernateNativeQueryProvider<>();
        provider.setSqlQuery(query);
        provider.setEntityClass(BodiCmMemberEntity.class);
        return provider;
    }

}

我分配了一个会话,在该会话上我在提供商中手动开始了一个不需要的事务。现在冲洗在内部得到妥善管理。