在 Java 中编写多线程映射迭代器
Writing a multithreaded mapping iterator in Java
class Mapper<F, T> implements Iterator<T> {
private Iterator<F> input;
private Action<F, T> action;
public Mapper(input, action) {...}
public boolean hasNext() {
return input.hasNext();
public T next() {
return action.process(input.next());
现在,考虑到 action.process() 可能很耗时,我想通过使用多个线程并行处理来自输入的项目来提高性能。我想分配一个 N 个工作线程的池,并将项目分配给这些线程进行处理。这应该会发生 "behind the scenes",因此客户端代码只会看到一个迭代器。代码应避免在内存中保存输入或输出序列。
关于最简单和最可靠的实现方法有什么建议吗?我正在寻找适用于 JDK 6 的东西,并且我希望尽可能避免引入对外部 libraries/frameworks 的依赖。
为了并行调用 action.process
,需要并行调用 next()
。这不是好的做法。相反,您可以使用 ExecutorCompletionService.
我认为它不能与并行线程一起工作,因为 hasNext() 可能 return 为真,但是当线程调用 next() 时可能没有更多元素。最好只使用 next() ,当没有更多元素时 return null
我建议查看 JDK 执行器框架。为您的操作创建任务 (运行nables)。 运行 如果需要,则使用线程池并行处理它们,如果不需要,则按顺序处理。如果最后需要排序,请给出任务序号。但是正如在其他答案中指出的那样,迭代器对您来说效果不佳,因为通常不会并行调用 next() 。那么您甚至需要迭代器还是只是为了处理任务?
我会为线程使用线程池,并使用 BlockingQueue
interface Action<F, T> {
public T process(F f);
class Mapper<F, T> implements Iterator<T> {
protected final Iterator<F> input;
protected final Action<F, T> action;
public Mapper(Iterator<F> input, Action<F, T> action) {
this.input = input;
this.action = action;
public boolean hasNext() {
return input.hasNext();
public T next() {
return action.process(input.next());
class ParallelMapper<F, T> extends Mapper<F, T> {
// The pool.
final ExecutorService pool;
// The queue.
final BlockingQueue<T> queue;
// The next one to deliver.
private T next = null;
public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
super(input, action);
// Start my pool.
pool = Executors.newFixedThreadPool(threads);
// And the queue.
queue = new ArrayBlockingQueue<>(queueLength);
class Worker implements Runnable {
final F f;
private T t;
public Worker(F f) {
this.f = f;
public void run() {
try {
} catch (InterruptedException ex) {
// Not sure what you can do here.
public boolean hasNext() {
// All done if delivered it and the input is empty and the queue is empty and the threads are finished.
while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
// First look in the queue.
next = queue.poll();
if (next == null) {
// Queue empty.
if (input.hasNext()) {
// Start a new worker.
pool.execute(new Worker(input.next()));
} else {
// Input exhausted - shut down the pool - unless we already have.
if (!pool.isShutdown()) {
return next != null;
public T next() {
T n = next;
if (n != null) {
// Delivered that one.
next = null;
} else {
// Fails.
throw new NoSuchElementException();
return n;
public void test() {
List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
for (Integer i : Iterables.in(data)) {
Action<Integer, Integer> action = new Action<Integer, Integer>() {
public Integer process(Integer f) {
try {
// Wait that many seconds.
Thread.sleep(1000L * f);
} catch (InterruptedException ex) {
// Just give up.
// Return it unchanged.
return f;
for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
System.out.println("Parallel Processed");
for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
只是创建一个 Iterable<T>
来封装传递的 Iterator<T>
对于您的顺序,您可以处理 Pair<Integer,F>
并使用 PriorityQueue
首先,我将 ItemMappingFunction 包装在 Callable 中:
private static class CallableAction<F extends Item, T extends Item>
implements Callable<T> {
private ItemMappingFunction<F, T> action;
private F input;
public CallableAction(ItemMappingFunction<F, T> action, F input) {
this.action = action;
this.input = input;
public T call() throws XPathException {
return action.mapItem(input);
我用标准迭代器 class 描述了我的问题,但实际上我使用的是我自己的 SequenceIterator 接口,它有一个 next() 方法,最后 returns null-顺序。
我根据 "ordinary" 映射迭代器声明 class,如下所示:
public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {
private ExecutorService service;
private BlockingQueue<Future<T>> resultQueue =
new LinkedBlockingQueue<Future<T>>();
public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
super(base, action);
int maxThreads = Runtime.getRuntime().availableProcessors();
maxThreads = maxThreads > 0 ? maxThreads : 1;
service = Executors.newFixedThreadPool(maxThreads);
// prime the queue
int n = 0;
while (n++ < maxThreads) {
F item = (F) base.next();
if (item == null) {
其中 mapOneItem 是:
private void mapOneItem(F in) throws XPathException {
Future<T> future = service.submit(new CallableAction(action, in));
当client请求下一个item时,我先将下一个input item提交给executor service,然后获取下一个output item,如果需要就等待它可用:
public T next() throws XPathException {
F nextIn = (F)base.next();
if (nextIn != null) {
try {
Future<T> future = resultQueue.poll();
if (future == null) {
return null;
} else {
return future.get();
} catch (InterruptedException e) {
throw new XPathException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof XPathException) {
throw (XPathException)e.getCause();
throw new XPathException(e);
class Mapper<F, T> implements Iterator<T> {
private Iterator<F> input;
private Action<F, T> action;
public Mapper(input, action) {...}
public boolean hasNext() {
return input.hasNext();
public T next() {
return action.process(input.next());
现在,考虑到 action.process() 可能很耗时,我想通过使用多个线程并行处理来自输入的项目来提高性能。我想分配一个 N 个工作线程的池,并将项目分配给这些线程进行处理。这应该会发生 "behind the scenes",因此客户端代码只会看到一个迭代器。代码应避免在内存中保存输入或输出序列。
关于最简单和最可靠的实现方法有什么建议吗?我正在寻找适用于 JDK 6 的东西,并且我希望尽可能避免引入对外部 libraries/frameworks 的依赖。
为了并行调用 action.process
,需要并行调用 next()
。这不是好的做法。相反,您可以使用 ExecutorCompletionService.
我认为它不能与并行线程一起工作,因为 hasNext() 可能 return 为真,但是当线程调用 next() 时可能没有更多元素。最好只使用 next() ,当没有更多元素时 return null
我建议查看 JDK 执行器框架。为您的操作创建任务 (运行nables)。 运行 如果需要,则使用线程池并行处理它们,如果不需要,则按顺序处理。如果最后需要排序,请给出任务序号。但是正如在其他答案中指出的那样,迭代器对您来说效果不佳,因为通常不会并行调用 next() 。那么您甚至需要迭代器还是只是为了处理任务?
我会为线程使用线程池,并使用 BlockingQueue
interface Action<F, T> {
public T process(F f);
class Mapper<F, T> implements Iterator<T> {
protected final Iterator<F> input;
protected final Action<F, T> action;
public Mapper(Iterator<F> input, Action<F, T> action) {
this.input = input;
this.action = action;
public boolean hasNext() {
return input.hasNext();
public T next() {
return action.process(input.next());
class ParallelMapper<F, T> extends Mapper<F, T> {
// The pool.
final ExecutorService pool;
// The queue.
final BlockingQueue<T> queue;
// The next one to deliver.
private T next = null;
public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
super(input, action);
// Start my pool.
pool = Executors.newFixedThreadPool(threads);
// And the queue.
queue = new ArrayBlockingQueue<>(queueLength);
class Worker implements Runnable {
final F f;
private T t;
public Worker(F f) {
this.f = f;
public void run() {
try {
} catch (InterruptedException ex) {
// Not sure what you can do here.
public boolean hasNext() {
// All done if delivered it and the input is empty and the queue is empty and the threads are finished.
while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
// First look in the queue.
next = queue.poll();
if (next == null) {
// Queue empty.
if (input.hasNext()) {
// Start a new worker.
pool.execute(new Worker(input.next()));
} else {
// Input exhausted - shut down the pool - unless we already have.
if (!pool.isShutdown()) {
return next != null;
public T next() {
T n = next;
if (n != null) {
// Delivered that one.
next = null;
} else {
// Fails.
throw new NoSuchElementException();
return n;
public void test() {
List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
for (Integer i : Iterables.in(data)) {
Action<Integer, Integer> action = new Action<Integer, Integer>() {
public Integer process(Integer f) {
try {
// Wait that many seconds.
Thread.sleep(1000L * f);
} catch (InterruptedException ex) {
// Just give up.
// Return it unchanged.
return f;
for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
System.out.println("Parallel Processed");
for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
只是创建一个 Iterable<T>
来封装传递的 Iterator<T>
对于您的顺序,您可以处理 Pair<Integer,F>
并使用 PriorityQueue
首先,我将 ItemMappingFunction 包装在 Callable 中:
private static class CallableAction<F extends Item, T extends Item>
implements Callable<T> {
private ItemMappingFunction<F, T> action;
private F input;
public CallableAction(ItemMappingFunction<F, T> action, F input) {
this.action = action;
this.input = input;
public T call() throws XPathException {
return action.mapItem(input);
我用标准迭代器 class 描述了我的问题,但实际上我使用的是我自己的 SequenceIterator 接口,它有一个 next() 方法,最后 returns null-顺序。
我根据 "ordinary" 映射迭代器声明 class,如下所示:
public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {
private ExecutorService service;
private BlockingQueue<Future<T>> resultQueue =
new LinkedBlockingQueue<Future<T>>();
public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
super(base, action);
int maxThreads = Runtime.getRuntime().availableProcessors();
maxThreads = maxThreads > 0 ? maxThreads : 1;
service = Executors.newFixedThreadPool(maxThreads);
// prime the queue
int n = 0;
while (n++ < maxThreads) {
F item = (F) base.next();
if (item == null) {
其中 mapOneItem 是:
private void mapOneItem(F in) throws XPathException {
Future<T> future = service.submit(new CallableAction(action, in));
当client请求下一个item时,我先将下一个input item提交给executor service,然后获取下一个output item,如果需要就等待它可用:
public T next() throws XPathException {
F nextIn = (F)base.next();
if (nextIn != null) {
try {
Future<T> future = resultQueue.poll();
if (future == null) {
return null;
} else {
return future.get();
} catch (InterruptedException e) {
throw new XPathException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof XPathException) {
throw (XPathException)e.getCause();
throw new XPathException(e);