以编程方式停止在 BlockingQueue 上同步的 Spring 批处理并行流
Programmatically stop Spring batch parallel flows synchronized on BlockingQueue
我有一个 spring 批处理作业,基本上是从文件中读取、处理每一行并写入输出(另一个文件)。
由于处理步骤成本很高,我想在多个线程中使用它 运行,但由于读取和写入步骤使用文件,因此这些步骤必须在单个线程中 运行。
我最终有 3 个流,每个 运行ning 并行,每个步骤一个,在 2 个 BlockingQueues 上同步。
读取步骤从文件读取并写入一个队列。
处理步骤是多线程的,从队列中读取,处理并写入另一个队列。
写入步骤,从第二个队列读取并将输出写入另一个文件。
它工作得很好,除了我找不到一个干净的 'fast' 方法来在一切都完成后停止工作。现在我在两个队列上使用 'poll' 和超时,并假设如果在几秒钟内没有项目存在,那么我们就完成了。这会将作业终止延迟指定的秒数,我不能使用非常少的时间,因为某些外力(如机器负载)作业可能会延迟。
我尝试使用像毒丸之类的东西,但问题是如果我将 FlatFileItemReader 上的 'doRead' 方法重写为 return 毒丸,当它获得 'null' (表示文件结束)那么这个 reader 将永远不会结束并且作业永远不会终止。
有人有什么建议吗?从文档中我知道我可能只在读取步骤(文件)的 reader 和写入步骤(文件)的写入器上放置一个 "synchronized",但我真的更喜欢不同的解决方案。
您可以在 reader 中添加一个状态变量来跟踪作业的结束。
public PoisoningReader<T> extends FlatFileItemReader<T> {
private boolean endJob = false;
@Override
public T doRead() {
if (endJob) {
return null;
}
T object = super.doRead();
if (object == null) {
endJob = true;
return new PoisonPill();
}
return item;
}
所以,我将 post 我的解决方案,以防有人感兴趣或遇到类似问题。
我,总结一下,我最终按照 Dean Clark 的建议使用了毒丸。
我最终简化了工作,只使用一个 BlockingQueue,但我仍然有如何注入毒药丸的问题,因为它是一个在步骤之间共享的队列,而不是在一个步骤中。.
基本上,我不是把 Reader 搞砸到 return 毒丸,而是让处理器检测并忽略它,我只是让 Spring 批处理 运行 通常,我只是在负责注射毒丸的步骤中添加了一个侦听器。此侦听器覆盖 "afterStep" 并将其添加到队列中。
从 Queue 中读取的 Step 将获得 Poison Pill 和 Queue 的末尾,表示 "no more work to do" 并将通过 returning null.
正常终止
另一个 'quirk' 是在一个作业中,从队列读取的步骤配置了一个线程池来并行处理项目,所以我需要 kill/unblock 所有线程从队列。一个不错的技巧是让 Reader 从队列中读取,如果它是毒丸,只需将它重新注入队列并且 return 为空。这样每个线程都会得到一个毒丸并正确终止。
我有一个 spring 批处理作业,基本上是从文件中读取、处理每一行并写入输出(另一个文件)。 由于处理步骤成本很高,我想在多个线程中使用它 运行,但由于读取和写入步骤使用文件,因此这些步骤必须在单个线程中 运行。 我最终有 3 个流,每个 运行ning 并行,每个步骤一个,在 2 个 BlockingQueues 上同步。 读取步骤从文件读取并写入一个队列。 处理步骤是多线程的,从队列中读取,处理并写入另一个队列。 写入步骤,从第二个队列读取并将输出写入另一个文件。
它工作得很好,除了我找不到一个干净的 'fast' 方法来在一切都完成后停止工作。现在我在两个队列上使用 'poll' 和超时,并假设如果在几秒钟内没有项目存在,那么我们就完成了。这会将作业终止延迟指定的秒数,我不能使用非常少的时间,因为某些外力(如机器负载)作业可能会延迟。
我尝试使用像毒丸之类的东西,但问题是如果我将 FlatFileItemReader 上的 'doRead' 方法重写为 return 毒丸,当它获得 'null' (表示文件结束)那么这个 reader 将永远不会结束并且作业永远不会终止。
有人有什么建议吗?从文档中我知道我可能只在读取步骤(文件)的 reader 和写入步骤(文件)的写入器上放置一个 "synchronized",但我真的更喜欢不同的解决方案。
您可以在 reader 中添加一个状态变量来跟踪作业的结束。
public PoisoningReader<T> extends FlatFileItemReader<T> {
private boolean endJob = false;
@Override
public T doRead() {
if (endJob) {
return null;
}
T object = super.doRead();
if (object == null) {
endJob = true;
return new PoisonPill();
}
return item;
}
所以,我将 post 我的解决方案,以防有人感兴趣或遇到类似问题。
我,总结一下,我最终按照 Dean Clark 的建议使用了毒丸。 我最终简化了工作,只使用一个 BlockingQueue,但我仍然有如何注入毒药丸的问题,因为它是一个在步骤之间共享的队列,而不是在一个步骤中。.
基本上,我不是把 Reader 搞砸到 return 毒丸,而是让处理器检测并忽略它,我只是让 Spring 批处理 运行 通常,我只是在负责注射毒丸的步骤中添加了一个侦听器。此侦听器覆盖 "afterStep" 并将其添加到队列中。 从 Queue 中读取的 Step 将获得 Poison Pill 和 Queue 的末尾,表示 "no more work to do" 并将通过 returning null.
正常终止另一个 'quirk' 是在一个作业中,从队列读取的步骤配置了一个线程池来并行处理项目,所以我需要 kill/unblock 所有线程从队列。一个不错的技巧是让 Reader 从队列中读取,如果它是毒丸,只需将它重新注入队列并且 return 为空。这样每个线程都会得到一个毒丸并正确终止。