以编程方式停止在 BlockingQueue 上同步的 Spring 批处理并行流

Programmatically stop Spring batch parallel flows synchronized on BlockingQueue

我有一个 spring 批处理作业,基本上是从文件中读取、处理每一行并写入输出(另一个文件)。 由于处理步骤成本很高,我想在多个线程中使用它 运行,但由于读取和写入步骤使用文件,因此这些步骤必须在单个线程中 运行。 我最终有 3 个流,每个 运行ning 并行,每个步骤一个,在 2 个 BlockingQueues 上同步。 读取步骤从文件读取并写入一个队列。 处理步骤是多线程的,从队列中读取,处理并写入另一个队列。 写入步骤,从第二个队列读取并将输出写入另一个文件。

它工作得很好,除了我找不到一个干净的 'fast' 方法来在一切都完成后停止工作。现在我在两个队列上使用 'poll' 和超时,并假设如果在几秒钟内没有项目存在,那么我们就完成了。这会将作业终止延迟指定的秒数,我不能使用非常少的时间,因为某些外力(如机器负载)作业可能会延迟。

我尝试使用像毒丸之类的东西,但问题是如果我将 FlatFileItemReader 上的 'doRead' 方法重写为 return 毒丸,当它获得 'null' (表示文件结束)那么这个 reader 将永远不会结束并且作业永远不会终止。

有人有什么建议吗?从文档中我知道我可能只在读取步骤(文件)的 reader 和写入步骤(文件)的写入器上放置一个 "synchronized",但我真的更喜欢不同的解决方案。

您可以在 reader 中添加一个状态变量来跟踪作业的结束。

public PoisoningReader<T> extends FlatFileItemReader<T> {
    private boolean endJob = false;

    @Override
    public T doRead() {
        if (endJob) {
            return null;
        }

        T object = super.doRead();
        if (object == null) {
            endJob = true;
            return new PoisonPill();
        }
        return item;
    }

所以,我将 post 我的解决方案,以防有人感兴趣或遇到类似问题。

我,总结一下,我最终按照 Dean Clark 的建议使用了毒丸。 我最终简化了工作,只使用一个 BlockingQueue,但我仍然有如何注入毒药丸的问题,因为它是一个在步骤之间共享的队列,而不是在一个步骤中。.

基本上,我不是把 Reader 搞砸到 return 毒丸,而是让处理器检测并忽略它,我只是让 Spring 批处理 运行 通常,我只是在负责注射毒丸的步骤中添加了一个侦听器。此侦听器覆盖 "afterStep" 并将其添加到队列中。 从 Queue 中读取的 Step 将获得 Poison Pill 和 Queue 的末尾,表示 "no more work to do" 并将通过 returning null.

正常终止

另一个 'quirk' 是在一个作业中,从队列读取的步骤配置了一个线程池来并行处理项目,所以我需要 kill/unblock 所有线程从队列。一个不错的技巧是让 Reader 从队列中读取,如果它是毒丸,只需将它重新注入队列并且 return 为空。这样每个线程都会得到一个毒丸并正确终止。