Spring 批处理内存过顶
Spring batch memory through the roof
我正在处理 Spring 批处理中的严重内存问题,这是一个令人担忧的问题。
流程很简单:从 Oracle 读取,转换为另一个 object 类型,然后将其写回另一个 table。它涉及 20 万条记录。
我尝试了 HibernateCursorItemReader 和 RepositoryItemReader.
我的作业执行以下步骤:
@Bean
public Step optInMutationHistoryStep() {
return stepBuilderFactory
.get(STEP_NAME)
.<BodiCmMemberEntity, AbstractMutationHistoryEntity> chunk(5)
.reader(optInItemReader)
.processor(optInMutationHistoryItemProcessor)
.writer(mutationHistoryItemWriter)
.faultTolerant()
.skipPolicy(itemSkipPolicy)
.skip(Exception.class)
.listener((StepExecutionListener) optInCountListener)
.build();
}
Reader:
@Component
public class OptInItemReaderTest extends RepositoryItemReader<BodiCmMemberEntity> {
public OptInItemReaderTest(BodiCmMemberRepository bodiCmMemberRepository){
setRepository(bodiCmMemberRepository);
setMethodName("findAllOptIns");
setPageSize(100);
Map<String, Sort.Direction> sort = new HashMap<>();
sort.put("member_number", Sort.Direction.ASC);
setSort(new HashMap<>(sort));
}
}
处理器:
@Component
@StepScope
public class OptInMutationHistoryItemProcessor implements ItemProcessor<CmMemberEntity, AbstractMutationHistoryEntity> {
Long jobId;
@BeforeStep
public void beforeStep(StepExecution stepExecution){
jobId = stepExecution.getJobExecutionId();
}
private final MutationHistoryBatchFactory mutationHistoryFactory;
public OptInMutationHistoryItemProcessor(MutationHistoryBatchFactory mutationHistoryFactory) {
this.mutationHistoryFactory = mutationHistoryFactory;
}
@Override
public AbstractMutationHistoryEntity process(CmMemberEntity cmMemberEntity){
return mutationHistoryFactory.addMutationHistoryEntity(cmMemberEntity, jobId, OPT_IN);
}
}
项目作者:
@Component
public class MutationHistoryItemWriter extends RepositoryItemWriter<AbstractMutationHistoryEntity>{
public MutationHistoryItemWriter(MutationHistoryRepository mutationHistoryRepository) {
setRepository(mutationHistoryRepository);
}
}
我在处理器中使用的工厂方法:
public AbstractMutationHistoryEntity addMutationHistoryEntity(CmMemberEntity cmMemberEntity, Long jobId, JobType jobType) {
return mutationHistoryEntityBuilder(cmMemberEntity, jobId, jobType, MutationType.ADD)
.editionCode(cmMemberEntity.getPoleEditionCodeEntity().getEditionCode())
.firstName(cmMemberEntity.getFirstName())
.lastName(cmMemberEntity.getLastName())
.streetName(cmMemberEntity.getStreetName())
.houseNumber(cmMemberEntity.getHouseNumber())
.box(cmMemberEntity.getBox())
.postalCode(cmMemberEntity.getPostalCode())
.numberPieces(DEFAULT_NUMBER_PIECES)
.build();
}
我没有看到内存中保存的直接引用,不确定是什么原因导致内存快速增加。翻倍很快。
直觉上我觉得问题要么是检索到的结果集没有定期刷新,要么是处理器以某种方式泄漏,但不确定为什么,因为我没有保存对我创建的 objects 的引用在处理器中。
有什么建议吗?
编辑:
我的完整工作如下:
@Bean
public Job optInJob(
OptInJobCompletionNotificationListener listener,
@Qualifier("optInSumoStep") Step optInSumoStep,
@Qualifier("optInMutationHistoryStep") Step optInMutationHistoryStep,
@Qualifier("optInMirrorStep") Step optInMirrorStep) {
return jobBuilderFactory.get(OPT_IN_JOB)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(optInSumoStep)
.next(optInMutationHistoryStep)
.next(optInMirrorStep)
.end()
.build();
}
在第一步中,使用相同的 itemreader 将记录写入 XML。第二步,执行最初共享的步骤。
在 OpenShift 中,很明显没有任何东西被清理,而且我没有任何引用 afaik;我不知道为什么我会:
很明显,消耗确实在一段时间后趋于平缓,但 ik 一直在上升,它永远不会下降。在第一步(大约 680Mb)之后,我本以为它会有。更重要的是,我希望曲线在第一步中也能变平;将块大小增加到 100 后,每处理 100 个块就会释放内存。
HibernateCursorItemReader 非常糟糕;在第一步中它已经上升到 700Mb。 repositoryItemWriter 似乎表现更好。也许是有原因的,但我不清楚是哪一个。
我现在不能说工作结束后是否会清理任何东西;因为它处理 200k 条记录,需要一些时间,我认为它会在完成之前再次 运行 内存不足。
我担心如果我们不设法解决这个问题,我们将无法投入生产。
编辑 2:
有趣的是,在第 2 步中,曲线已经变平到内存消耗没有增加的程度。这很有趣;我目前无法说出为什么是 'now'.
目前的希望/期望是第 3 步不会增加内存消耗,并且在作业完成后清理内存。
看来插入速度已经大大减慢了。我估计是 3 倍(第二步大约有 110k 条记录)。
编辑 3:
在 itemwriter 上应用 flush 对减少内存消耗或速度没有任何作用:
其实这次是运行直奔我的极限,但是我无法解释:
你可以清楚地看到这个过程是如何变慢的,只是读取、转换、写入记录。
我目前不知道为什么,但对于批处理应用程序,我不认为这是我们将其移至生产环境的可接受行为table。两批 运行 之后,它就会变得陈旧。
感谢@Mahmoud Ben Hassine 和@M,问题得到解决(或至少达到可接受的水平)。 Deinum.
itemWriters 如下所示:
@Component
public class MutationHistoryItemWriter extends RepositoryItemWriter<AbstractMutationHistoryEntity>{
@PersistenceContext
private EntityManager entityManager;
private final MutationHistoryRepository mutationHistoryRepository;
public MutationHistoryItemWriter(MutationHistoryRepository mutationHistoryRepository) {
this.mutationHistoryRepository = mutationHistoryRepository;
setRepository(mutationHistoryRepository);
}
@Override
public void write(List<? extends AbstractMutationHistoryEntity> items) throws Exception {
super.write(items);
mutationHistoryRepository.flush();
entityManager.clear();
}
}
public class MirrorItemWriter extends RepositoryItemWriter<SumoCmMemberEntity> {
@PersistenceContext
private EntityManager entityManager;
private final SumoCmMemberRepository sumoCmMemberRepository;
public MirrorItemWriter(SumoCmMemberRepository sumoCmMemberRepository) {
this.sumoCmMemberRepository = sumoCmMemberRepository;
setRepository(sumoCmMemberRepository);
}
@Override
public void write(List<? extends SumoCmMemberEntity> items) throws Exception {
super.write(items);
sumoCmMemberRepository.flush();
entityManager.clear();
}
}
有趣的是:
- 在刷新编写器并清除 entityManagers 后,写入 xml 也突然大幅加速(第 1 步),这对我来说没有意义,但它有效(我不使用异步处理或多线程)。
- HibernateCursorItemReader 现在也执行得更正常了。我不确定在我的初始测试期间触发的大量内存消耗行为是什么(HibernateCursorItemReader 与 RepositoryItemReader 之间的区别),但在这一点上并不重要。
我执行了两次测试,中间有一次休息,
处理 3 步 * 200k 条记录(总共 600k),这 2 次。
它基本上从 4 小时 + 堆溢出到第一个只有几分钟 运行:
在第二个 运行 上,我看到再次发生了一些退化,但考虑到之前的情况,我仍然认为这是快速的:
出于某种原因,内存只是不想让它中断。
我开始把它当作个人 (LoL)。
处理器短暂出现峰值,但没有出现异常。
这对我来说不是真正的问题,因为 200k 记录只是一个负载测试。初始状态将通过 sql 脚本设置,因此这不是真正的问题。随之而来的突变最多每周会涉及几千个。
我们还将进行 Influx / Grafana 监控,因此我们将能够监控 jvm + 具有警报。
不过还是有点困扰我,所以我会在今天剩下的时间里继续做一些测试。
编辑:
将所有 HibernateCursorItemReaders 替换为 RepositoryItemReaders 后,内存消耗看起来不错。
下图表示 2 个批次 运行s,每个批次总共 600k 条记录:
处理一个奇怪的问题,我的记录只有一半存储在数据库中,但这可能是因为一个不相关的错误。
编辑:
因为 RepositoryItemReader 是分页的,所以只存储了一半的记录。我同时修复了 HibernateItemReader 的内存错误。
@Component
public class OptInItemReader extends HibernateCursorItemReader<BodiCmMemberEntity> {
SessionFactory sessionFactory;
private static final String ITEM_READER_NAME = "OPT_IN_ITEM_READER";
private static final String QUERY_FIND_OPT_INS =
"""
Your query
""";
public OptInItemReader(SessionFactory sessionFactory) throws Exception {
this.sessionFactory = sessionFactory;
setName(ITEM_READER_NAME);
setSessionFactory(sessionFactory);
setQueryProvider(provider(QUERY_FIND_OPT_INS, sessionFactory));
}
private HibernateNativeQueryProvider<BodiCmMemberEntity> provider(String query, SessionFactory sessionFactory) {
HibernateNativeQueryProvider<BodiCmMemberEntity> provider = new HibernateNativeQueryProvider<>();
provider.setSqlQuery(query);
provider.setEntityClass(BodiCmMemberEntity.class);
return provider;
}
}
我分配了一个会话,在该会话上我在提供商中手动开始了一个不需要的事务。现在冲洗在内部得到妥善管理。
我正在处理 Spring 批处理中的严重内存问题,这是一个令人担忧的问题。
流程很简单:从 Oracle 读取,转换为另一个 object 类型,然后将其写回另一个 table。它涉及 20 万条记录。
我尝试了 HibernateCursorItemReader 和 RepositoryItemReader.
我的作业执行以下步骤:
@Bean
public Step optInMutationHistoryStep() {
return stepBuilderFactory
.get(STEP_NAME)
.<BodiCmMemberEntity, AbstractMutationHistoryEntity> chunk(5)
.reader(optInItemReader)
.processor(optInMutationHistoryItemProcessor)
.writer(mutationHistoryItemWriter)
.faultTolerant()
.skipPolicy(itemSkipPolicy)
.skip(Exception.class)
.listener((StepExecutionListener) optInCountListener)
.build();
}
Reader:
@Component
public class OptInItemReaderTest extends RepositoryItemReader<BodiCmMemberEntity> {
public OptInItemReaderTest(BodiCmMemberRepository bodiCmMemberRepository){
setRepository(bodiCmMemberRepository);
setMethodName("findAllOptIns");
setPageSize(100);
Map<String, Sort.Direction> sort = new HashMap<>();
sort.put("member_number", Sort.Direction.ASC);
setSort(new HashMap<>(sort));
}
}
处理器:
@Component
@StepScope
public class OptInMutationHistoryItemProcessor implements ItemProcessor<CmMemberEntity, AbstractMutationHistoryEntity> {
Long jobId;
@BeforeStep
public void beforeStep(StepExecution stepExecution){
jobId = stepExecution.getJobExecutionId();
}
private final MutationHistoryBatchFactory mutationHistoryFactory;
public OptInMutationHistoryItemProcessor(MutationHistoryBatchFactory mutationHistoryFactory) {
this.mutationHistoryFactory = mutationHistoryFactory;
}
@Override
public AbstractMutationHistoryEntity process(CmMemberEntity cmMemberEntity){
return mutationHistoryFactory.addMutationHistoryEntity(cmMemberEntity, jobId, OPT_IN);
}
}
项目作者:
@Component
public class MutationHistoryItemWriter extends RepositoryItemWriter<AbstractMutationHistoryEntity>{
public MutationHistoryItemWriter(MutationHistoryRepository mutationHistoryRepository) {
setRepository(mutationHistoryRepository);
}
}
我在处理器中使用的工厂方法:
public AbstractMutationHistoryEntity addMutationHistoryEntity(CmMemberEntity cmMemberEntity, Long jobId, JobType jobType) {
return mutationHistoryEntityBuilder(cmMemberEntity, jobId, jobType, MutationType.ADD)
.editionCode(cmMemberEntity.getPoleEditionCodeEntity().getEditionCode())
.firstName(cmMemberEntity.getFirstName())
.lastName(cmMemberEntity.getLastName())
.streetName(cmMemberEntity.getStreetName())
.houseNumber(cmMemberEntity.getHouseNumber())
.box(cmMemberEntity.getBox())
.postalCode(cmMemberEntity.getPostalCode())
.numberPieces(DEFAULT_NUMBER_PIECES)
.build();
}
我没有看到内存中保存的直接引用,不确定是什么原因导致内存快速增加。翻倍很快。
直觉上我觉得问题要么是检索到的结果集没有定期刷新,要么是处理器以某种方式泄漏,但不确定为什么,因为我没有保存对我创建的 objects 的引用在处理器中。
有什么建议吗?
编辑:
我的完整工作如下:
@Bean
public Job optInJob(
OptInJobCompletionNotificationListener listener,
@Qualifier("optInSumoStep") Step optInSumoStep,
@Qualifier("optInMutationHistoryStep") Step optInMutationHistoryStep,
@Qualifier("optInMirrorStep") Step optInMirrorStep) {
return jobBuilderFactory.get(OPT_IN_JOB)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(optInSumoStep)
.next(optInMutationHistoryStep)
.next(optInMirrorStep)
.end()
.build();
}
在第一步中,使用相同的 itemreader 将记录写入 XML。第二步,执行最初共享的步骤。
在 OpenShift 中,很明显没有任何东西被清理,而且我没有任何引用 afaik;我不知道为什么我会:
很明显,消耗确实在一段时间后趋于平缓,但 ik 一直在上升,它永远不会下降。在第一步(大约 680Mb)之后,我本以为它会有。更重要的是,我希望曲线在第一步中也能变平;将块大小增加到 100 后,每处理 100 个块就会释放内存。
HibernateCursorItemReader 非常糟糕;在第一步中它已经上升到 700Mb。 repositoryItemWriter 似乎表现更好。也许是有原因的,但我不清楚是哪一个。
我现在不能说工作结束后是否会清理任何东西;因为它处理 200k 条记录,需要一些时间,我认为它会在完成之前再次 运行 内存不足。
我担心如果我们不设法解决这个问题,我们将无法投入生产。
编辑 2:
有趣的是,在第 2 步中,曲线已经变平到内存消耗没有增加的程度。这很有趣;我目前无法说出为什么是 'now'.
目前的希望/期望是第 3 步不会增加内存消耗,并且在作业完成后清理内存。 看来插入速度已经大大减慢了。我估计是 3 倍(第二步大约有 110k 条记录)。
编辑 3: 在 itemwriter 上应用 flush 对减少内存消耗或速度没有任何作用:
其实这次是运行直奔我的极限,但是我无法解释:
你可以清楚地看到这个过程是如何变慢的,只是读取、转换、写入记录。
我目前不知道为什么,但对于批处理应用程序,我不认为这是我们将其移至生产环境的可接受行为table。两批 运行 之后,它就会变得陈旧。
感谢@Mahmoud Ben Hassine 和@M,问题得到解决(或至少达到可接受的水平)。 Deinum.
itemWriters 如下所示:
@Component
public class MutationHistoryItemWriter extends RepositoryItemWriter<AbstractMutationHistoryEntity>{
@PersistenceContext
private EntityManager entityManager;
private final MutationHistoryRepository mutationHistoryRepository;
public MutationHistoryItemWriter(MutationHistoryRepository mutationHistoryRepository) {
this.mutationHistoryRepository = mutationHistoryRepository;
setRepository(mutationHistoryRepository);
}
@Override
public void write(List<? extends AbstractMutationHistoryEntity> items) throws Exception {
super.write(items);
mutationHistoryRepository.flush();
entityManager.clear();
}
}
public class MirrorItemWriter extends RepositoryItemWriter<SumoCmMemberEntity> {
@PersistenceContext
private EntityManager entityManager;
private final SumoCmMemberRepository sumoCmMemberRepository;
public MirrorItemWriter(SumoCmMemberRepository sumoCmMemberRepository) {
this.sumoCmMemberRepository = sumoCmMemberRepository;
setRepository(sumoCmMemberRepository);
}
@Override
public void write(List<? extends SumoCmMemberEntity> items) throws Exception {
super.write(items);
sumoCmMemberRepository.flush();
entityManager.clear();
}
}
有趣的是:
- 在刷新编写器并清除 entityManagers 后,写入 xml 也突然大幅加速(第 1 步),这对我来说没有意义,但它有效(我不使用异步处理或多线程)。
- HibernateCursorItemReader 现在也执行得更正常了。我不确定在我的初始测试期间触发的大量内存消耗行为是什么(HibernateCursorItemReader 与 RepositoryItemReader 之间的区别),但在这一点上并不重要。
我执行了两次测试,中间有一次休息, 处理 3 步 * 200k 条记录(总共 600k),这 2 次。
它基本上从 4 小时 + 堆溢出到第一个只有几分钟 运行:
在第二个 运行 上,我看到再次发生了一些退化,但考虑到之前的情况,我仍然认为这是快速的:
出于某种原因,内存只是不想让它中断。
我开始把它当作个人 (LoL)。
处理器短暂出现峰值,但没有出现异常。
这对我来说不是真正的问题,因为 200k 记录只是一个负载测试。初始状态将通过 sql 脚本设置,因此这不是真正的问题。随之而来的突变最多每周会涉及几千个。
我们还将进行 Influx / Grafana 监控,因此我们将能够监控 jvm + 具有警报。
不过还是有点困扰我,所以我会在今天剩下的时间里继续做一些测试。
编辑:
将所有 HibernateCursorItemReaders 替换为 RepositoryItemReaders 后,内存消耗看起来不错。
下图表示 2 个批次 运行s,每个批次总共 600k 条记录:
处理一个奇怪的问题,我的记录只有一半存储在数据库中,但这可能是因为一个不相关的错误。
编辑: 因为 RepositoryItemReader 是分页的,所以只存储了一半的记录。我同时修复了 HibernateItemReader 的内存错误。
@Component
public class OptInItemReader extends HibernateCursorItemReader<BodiCmMemberEntity> {
SessionFactory sessionFactory;
private static final String ITEM_READER_NAME = "OPT_IN_ITEM_READER";
private static final String QUERY_FIND_OPT_INS =
"""
Your query
""";
public OptInItemReader(SessionFactory sessionFactory) throws Exception {
this.sessionFactory = sessionFactory;
setName(ITEM_READER_NAME);
setSessionFactory(sessionFactory);
setQueryProvider(provider(QUERY_FIND_OPT_INS, sessionFactory));
}
private HibernateNativeQueryProvider<BodiCmMemberEntity> provider(String query, SessionFactory sessionFactory) {
HibernateNativeQueryProvider<BodiCmMemberEntity> provider = new HibernateNativeQueryProvider<>();
provider.setSqlQuery(query);
provider.setEntityClass(BodiCmMemberEntity.class);
return provider;
}
}
我分配了一个会话,在该会话上我在提供商中手动开始了一个不需要的事务。现在冲洗在内部得到妥善管理。