在 Java 中编写多线程映射迭代器

Writing a multithreaded mapping iterator in Java

我有一个通用的映射迭代器:像这样的东西:

class Mapper<F, T> implements Iterator<T> {

  private Iterator<F> input;
  private Action<F, T> action;

  public Mapper(input, action) {...}

  public boolean hasNext() {
    return input.hasNext();
  }

  public T next() {
    return action.process(input.next());
  }
}

现在,考虑到 action.process() 可能很耗时,我想通过使用多个线程并行处理来自输入的项目来提高性能。我想分配一个 N 个工作线程的池,并将项目分配给这些线程进行处理。这应该会发生 "behind the scenes",因此客户端代码只会看到一个迭代器。代码应避免在内存中保存输入或输出序列。

为了增加一点变化,我想要两个版本的解决方案,一个保留顺序(最终迭代器以与输入迭代器相同的顺序交付项目),另一个不一定保留顺序(每个输出项目有货即送。

我有点让这个工作正常,但代码似乎令人费解且不可靠,我不确定它是否使用了最佳实践。

关于最简单和最可靠的实现方法有什么建议吗?我正在寻找适用于 JDK 6 的东西,并且我希望尽可能避免引入对外部 libraries/frameworks 的依赖。

为了并行调用 action.process,需要并行调用 next()。这不是好的做法。相反,您可以使用 ExecutorCompletionService.

不幸的是,我认为这只会让您选择保留顺序。

我认为它不能与并行线程一起工作,因为 hasNext() 可能 return 为真,但是当线程调用 next() 时可能没有更多元素。最好只使用 next() ,当没有更多元素时 return null

我建议查看 JDK 执行器框架。为您的操作创建任务 (运行nables)。 运行 如果需要,则使用线程池并行处理它们,如果不需要,则按顺序处理。如果最后需要排序,请给出任务序号。但是正如在其他答案中指出的那样,迭代器对您来说效果不佳,因为通常不会并行调用 next() 。那么您甚至需要迭代器还是只是为了处理任务?

我会为线程使用线程池,并使用 BlockingQueue 从池中获取数据。

这似乎适用于我的简单测试用例。

interface Action<F, T> {

    public T process(F f);

}

class Mapper<F, T> implements Iterator<T> {

    protected final Iterator<F> input;
    protected final Action<F, T> action;

    public Mapper(Iterator<F> input, Action<F, T> action) {
        this.input = input;
        this.action = action;
    }

    @Override
    public boolean hasNext() {
        return input.hasNext();
    }

    @Override
    public T next() {
        return action.process(input.next());
    }
}

class ParallelMapper<F, T> extends Mapper<F, T> {

    // The pool.
    final ExecutorService pool;
    // The queue.
    final BlockingQueue<T> queue;
    // The next one to deliver.
    private T next = null;

    public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
        super(input, action);
        // Start my pool.
        pool = Executors.newFixedThreadPool(threads);
        // And the queue.
        queue = new ArrayBlockingQueue<>(queueLength);
    }

    class Worker implements Runnable {

        final F f;
        private T t;

        public Worker(F f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                queue.put(action.process(f));
            } catch (InterruptedException ex) {
                // Not sure what you can do here.
            }
        }

    }

    @Override
    public boolean hasNext() {
        // All done if delivered it and the input is empty and the queue is empty and the threads are finished.
        while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
            // First look in the queue.
            next = queue.poll();
            if (next == null) {
                // Queue empty.
                if (input.hasNext()) {
                    // Start a new worker.
                    pool.execute(new Worker(input.next()));
                }
            } else {
                // Input exhausted - shut down the pool - unless we already have.
                if (!pool.isShutdown()) {
                    pool.shutdown();
                }
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        T n = next;
        if (n != null) {
            // Delivered that one.
            next = null;
        } else {
            // Fails.
            throw new NoSuchElementException();
        }
        return n;
    }
}

public void test() {
    List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
    System.out.println("Data");
    for (Integer i : Iterables.in(data)) {
        System.out.println(i);
    }
    Action<Integer, Integer> action = new Action<Integer, Integer>() {

        @Override
        public Integer process(Integer f) {
            try {
                // Wait that many seconds.
                Thread.sleep(1000L * f);
            } catch (InterruptedException ex) {
                // Just give up.
            }
            // Return it unchanged.
            return f;
        }

    };
    System.out.println("Processed");
    for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
        System.out.println(i);
    }
    System.out.println("Parallel Processed");
    for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
        System.out.println(i);
    }

}

注意:Iterables.in(Iterator<T>) 只是创建一个 Iterable<T> 来封装传递的 Iterator<T>

对于您的顺序,您可以处理 Pair<Integer,F> 并使用 PriorityQueue 作为线程输出。然后您可以安排按顺序拉动它们。

好的,谢谢大家。这就是我所做的。

首先,我将 ItemMappingFunction 包装在 Callable 中:

private static class CallableAction<F extends Item, T extends Item> 
implements Callable<T> {
    private ItemMappingFunction<F, T> action;
    private F input;
    public CallableAction(ItemMappingFunction<F, T> action, F input) {
            this.action = action;
            this.input = input;
    }
    public T call() throws XPathException {
            return action.mapItem(input);
    }
}

我用标准迭代器 class 描述了我的问题,但实际上我使用的是我自己的 SequenceIterator 接口,它有一个 next() 方法,最后 returns null-顺序。

我根据 "ordinary" 映射迭代器声明 class,如下所示:

public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {

    private ExecutorService service;
    private BlockingQueue<Future<T>> resultQueue = 
        new LinkedBlockingQueue<Future<T>>();

在初始化时,我创建服务并启动队列:

public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
        super(base, action);

        int maxThreads = Runtime.getRuntime().availableProcessors();
        maxThreads = maxThreads > 0 ? maxThreads : 1;
        service = Executors.newFixedThreadPool(maxThreads);

        // prime the queue
        int n = 0;
        while (n++ < maxThreads) {
            F item = (F) base.next();
            if (item == null) {
                return;
            }
            mapOneItem(item);
        }
    }

其中 mapOneItem 是:

private void mapOneItem(F in) throws XPathException {
    Future<T> future = service.submit(new CallableAction(action, in));
    resultQueue.add(future);
}

当client请求下一个item时,我先将下一个input item提交给executor service,然后获取下一个output item,如果需要就等待它可用:

    public T next() throws XPathException {
        F nextIn = (F)base.next();
        if (nextIn != null) {
            mapOneItem(nextIn);
        }
        try {
            Future<T> future = resultQueue.poll();
            if (future == null) {
                service.shutdown();
                return null;
            } else {
                return future.get();
            }
        } catch (InterruptedException e) {
            throw new XPathException(e);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof XPathException) {
                throw (XPathException)e.getCause();
            }
            throw new XPathException(e);
        }
    }