使用执行器处理 Java 中流的子流

Processing sub-streams of a stream in Java using executors

我有一个程序可以处理通过网络传入的大量数据流(不是 java.util.stream 的意思,而是 InputStream 的意思)。流由对象组成,每个对象都有一种子流标识符。现在整个处理都是在一个线程中完成的,但是需要很多CPU时间,而且每个子流都可以很容易地独立处理,所以我正在考虑多线程。

然而,每个子流都需要保留大量庞大的状态,包括各种缓冲区、哈希映射等。没有特别的理由让它并发或同步,因为子流彼此独立。此外,每个子流都要求其对象按照它们到达的顺序进行处理,这意味着每个子流可能应该有一个线程(但可能一个线程处理多个子流)。

我正在考虑几种方法,但它们都不太优雅。

  1. 为所有任务创建一个 ThreadPoolExecutor。每个任务将包含下一个要处理的对象和对保持所有状态的 Processor 实例的引用。这将确保必要的先行发生关系,从而确保处理线程将看到该子流的最新状态。据我所知,这种方法无法确保同一子流的下一个对象将在同一线程中处理。此外,它需要一些保证对象将按照它们进入的顺序进行处理,这将需要对 Processor 个对象进行额外的同步,从而引入不必要的延迟。

  2. 手动创建多个单线程执行器和一种将子流标识符映射到执行器的哈希映射。这种方法需要手动管理执行器,在新的子流开始或结束时创建或关闭它们,并相应地在它们之间分配任务。

  3. 创建一个自定义执行器来处理一个特殊的任务子类,每个任务都有一个子流 ID。此执行程序将使用它作为提示,使用与具有相同 ID 的前一个线程相同的线程来执行此任务。但是,我看不到实现此类执行程序的简单方法。不幸的是,似乎不可能扩展任何现有的执行器 类,从头开始实现执行器有点矫枉过正。

  4. 创建一个 ThreadPoolExecutor,但不是为每个传入对象创建一个任务,而是为每个会阻塞的子流创建一个长 运行 任务一个并发队列,等待下一个对象。然后根据子流ID将对象放入队列中。这种方法需要与子流一样多的线程,因为任务将被阻塞。预计子流数量在30-60个左右,可以接受

  5. 或者,按照 4 进行,但限制线程数,将多个子流分配给单个任务。这是 2 和 4 之间的混合体。据我所知,这是其中最好的方法,但它仍然需要在任务之间进行某种手动子流分配,以及关闭额外任务的某种方式分流结束.

确保每个子流在其自己的线程中处理而没有大量容易出错的代码的最佳方法是什么?这样下面的伪代码就可以工作了:

// loop {
    Item next = stream.read();
    int id = next.getSubstreamID();
    Processor processor = getProcessor(id);
    SubstreamTask task = new SubstreamTask(processor, next, id);
    executor.submit(task); // This makes sure that the task will
                           // be executed in the same thread as the
                           // previous task with the same ID.
// } // loop

我建议使用一组单线程执行器。如果您可以为子流设计一致的哈希策略,则可以将子流映射到单独的线程。例如

final ExecutorsService[] es = ...

public void submit(int id, Runnable run) {
   es[(id & 0x7FFFFFFF) % es.length].submit(run);
}

密钥可以是 Stringlong,但可以通过某种方式识别子流。如果您知道某个特定的子流非常昂贵,您可以为其分配一个专用线程。

我最终选择的解决方案是这样的:

private final Executor[] streamThreads
        = new Executor[Runtime.getRuntime().availableProcessors()];
{
    for (int i = 0; i < streamThreads.length; ++i) {
        streamThreads[i] = Executors.newSingleThreadExecutor();
    }
}
private final ConcurrentHashMap<SubstreamId, Integer>
        threadById = new ConcurrentHashMap<>();

此代码决定使用哪个执行器:

    Message msg = in.readNext();
    SubstreamId msgSubstream = msg.getSubstreamId();
    int exe = threadById.computeIfAbsent(msgSubstream,
            id -> findBestExecutor());
    streamThreads[exe].execute(() -> {
        // processing goes here
    });

findBestExecutor()函数是这样的:

private int findBestExecutor() {
    // Thread index -> substream count mapping:
    final int[] loads = new int[streamThreads.length];
    for (int thread : threadById.values()) {
        ++loads[thread];
    }
    // return the index of the minimum load
    return IntStream.range(0, streamThreads.length)
            .reduce((i, j) -> loads[i] <= loads[j] ? i : j)
            .orElse(0);
}

当然,这不是很有效,但请注意,只有在出现新的子流时才会调用此函数(每隔几个小时会发生几次,所以对我来说这没什么大不了的) .我的真实代码看起来有点复杂,因为我有一种方法可以确定两个子流是否可能同时完成,如果是,我会尝试将它们分配给不同的线程,以便在它们完成后保持均匀的负载。但由于我在问题中从未提及这个细节,我想它也不属于答案。