在 Java 中编写多线程映射迭代器
Writing a multithreaded mapping iterator in Java
我有一个通用的映射迭代器:像这样的东西:
class Mapper<F, T> implements Iterator<T> {
private Iterator<F> input;
private Action<F, T> action;
public Mapper(input, action) {...}
public boolean hasNext() {
return input.hasNext();
}
public T next() {
return action.process(input.next());
}
}
现在,考虑到 action.process() 可能很耗时,我想通过使用多个线程并行处理来自输入的项目来提高性能。我想分配一个 N 个工作线程的池,并将项目分配给这些线程进行处理。这应该会发生 "behind the scenes",因此客户端代码只会看到一个迭代器。代码应避免在内存中保存输入或输出序列。
为了增加一点变化,我想要两个版本的解决方案,一个保留顺序(最终迭代器以与输入迭代器相同的顺序交付项目),另一个不一定保留顺序(每个输出项目有货即送。
我有点让这个工作正常,但代码似乎令人费解且不可靠,我不确定它是否使用了最佳实践。
关于最简单和最可靠的实现方法有什么建议吗?我正在寻找适用于 JDK 6 的东西,并且我希望尽可能避免引入对外部 libraries/frameworks 的依赖。
为了并行调用 action.process
,需要并行调用 next()
。这不是好的做法。相反,您可以使用 ExecutorCompletionService.
见
不幸的是,我认为这只会让您选择保留顺序。
我认为它不能与并行线程一起工作,因为 hasNext() 可能 return 为真,但是当线程调用 next() 时可能没有更多元素。最好只使用 next() ,当没有更多元素时 return null
我建议查看 JDK 执行器框架。为您的操作创建任务 (运行nables)。 运行 如果需要,则使用线程池并行处理它们,如果不需要,则按顺序处理。如果最后需要排序,请给出任务序号。但是正如在其他答案中指出的那样,迭代器对您来说效果不佳,因为通常不会并行调用 next() 。那么您甚至需要迭代器还是只是为了处理任务?
我会为线程使用线程池,并使用 BlockingQueue
从池中获取数据。
这似乎适用于我的简单测试用例。
interface Action<F, T> {
public T process(F f);
}
class Mapper<F, T> implements Iterator<T> {
protected final Iterator<F> input;
protected final Action<F, T> action;
public Mapper(Iterator<F> input, Action<F, T> action) {
this.input = input;
this.action = action;
}
@Override
public boolean hasNext() {
return input.hasNext();
}
@Override
public T next() {
return action.process(input.next());
}
}
class ParallelMapper<F, T> extends Mapper<F, T> {
// The pool.
final ExecutorService pool;
// The queue.
final BlockingQueue<T> queue;
// The next one to deliver.
private T next = null;
public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
super(input, action);
// Start my pool.
pool = Executors.newFixedThreadPool(threads);
// And the queue.
queue = new ArrayBlockingQueue<>(queueLength);
}
class Worker implements Runnable {
final F f;
private T t;
public Worker(F f) {
this.f = f;
}
@Override
public void run() {
try {
queue.put(action.process(f));
} catch (InterruptedException ex) {
// Not sure what you can do here.
}
}
}
@Override
public boolean hasNext() {
// All done if delivered it and the input is empty and the queue is empty and the threads are finished.
while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
// First look in the queue.
next = queue.poll();
if (next == null) {
// Queue empty.
if (input.hasNext()) {
// Start a new worker.
pool.execute(new Worker(input.next()));
}
} else {
// Input exhausted - shut down the pool - unless we already have.
if (!pool.isShutdown()) {
pool.shutdown();
}
}
}
return next != null;
}
@Override
public T next() {
T n = next;
if (n != null) {
// Delivered that one.
next = null;
} else {
// Fails.
throw new NoSuchElementException();
}
return n;
}
}
public void test() {
List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
System.out.println("Data");
for (Integer i : Iterables.in(data)) {
System.out.println(i);
}
Action<Integer, Integer> action = new Action<Integer, Integer>() {
@Override
public Integer process(Integer f) {
try {
// Wait that many seconds.
Thread.sleep(1000L * f);
} catch (InterruptedException ex) {
// Just give up.
}
// Return it unchanged.
return f;
}
};
System.out.println("Processed");
for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
System.out.println(i);
}
System.out.println("Parallel Processed");
for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
System.out.println(i);
}
}
注意:Iterables.in(Iterator<T>)
只是创建一个 Iterable<T>
来封装传递的 Iterator<T>
。
对于您的顺序,您可以处理 Pair<Integer,F>
并使用 PriorityQueue
作为线程输出。然后您可以安排按顺序拉动它们。
好的,谢谢大家。这就是我所做的。
首先,我将 ItemMappingFunction 包装在 Callable 中:
private static class CallableAction<F extends Item, T extends Item>
implements Callable<T> {
private ItemMappingFunction<F, T> action;
private F input;
public CallableAction(ItemMappingFunction<F, T> action, F input) {
this.action = action;
this.input = input;
}
public T call() throws XPathException {
return action.mapItem(input);
}
}
我用标准迭代器 class 描述了我的问题,但实际上我使用的是我自己的 SequenceIterator 接口,它有一个 next() 方法,最后 returns null-顺序。
我根据 "ordinary" 映射迭代器声明 class,如下所示:
public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {
private ExecutorService service;
private BlockingQueue<Future<T>> resultQueue =
new LinkedBlockingQueue<Future<T>>();
在初始化时,我创建服务并启动队列:
public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
super(base, action);
int maxThreads = Runtime.getRuntime().availableProcessors();
maxThreads = maxThreads > 0 ? maxThreads : 1;
service = Executors.newFixedThreadPool(maxThreads);
// prime the queue
int n = 0;
while (n++ < maxThreads) {
F item = (F) base.next();
if (item == null) {
return;
}
mapOneItem(item);
}
}
其中 mapOneItem 是:
private void mapOneItem(F in) throws XPathException {
Future<T> future = service.submit(new CallableAction(action, in));
resultQueue.add(future);
}
当client请求下一个item时,我先将下一个input item提交给executor service,然后获取下一个output item,如果需要就等待它可用:
public T next() throws XPathException {
F nextIn = (F)base.next();
if (nextIn != null) {
mapOneItem(nextIn);
}
try {
Future<T> future = resultQueue.poll();
if (future == null) {
service.shutdown();
return null;
} else {
return future.get();
}
} catch (InterruptedException e) {
throw new XPathException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof XPathException) {
throw (XPathException)e.getCause();
}
throw new XPathException(e);
}
}
我有一个通用的映射迭代器:像这样的东西:
class Mapper<F, T> implements Iterator<T> {
private Iterator<F> input;
private Action<F, T> action;
public Mapper(input, action) {...}
public boolean hasNext() {
return input.hasNext();
}
public T next() {
return action.process(input.next());
}
}
现在,考虑到 action.process() 可能很耗时,我想通过使用多个线程并行处理来自输入的项目来提高性能。我想分配一个 N 个工作线程的池,并将项目分配给这些线程进行处理。这应该会发生 "behind the scenes",因此客户端代码只会看到一个迭代器。代码应避免在内存中保存输入或输出序列。
为了增加一点变化,我想要两个版本的解决方案,一个保留顺序(最终迭代器以与输入迭代器相同的顺序交付项目),另一个不一定保留顺序(每个输出项目有货即送。
我有点让这个工作正常,但代码似乎令人费解且不可靠,我不确定它是否使用了最佳实践。
关于最简单和最可靠的实现方法有什么建议吗?我正在寻找适用于 JDK 6 的东西,并且我希望尽可能避免引入对外部 libraries/frameworks 的依赖。
为了并行调用 action.process
,需要并行调用 next()
。这不是好的做法。相反,您可以使用 ExecutorCompletionService.
见
不幸的是,我认为这只会让您选择保留顺序。
我认为它不能与并行线程一起工作,因为 hasNext() 可能 return 为真,但是当线程调用 next() 时可能没有更多元素。最好只使用 next() ,当没有更多元素时 return null
我建议查看 JDK 执行器框架。为您的操作创建任务 (运行nables)。 运行 如果需要,则使用线程池并行处理它们,如果不需要,则按顺序处理。如果最后需要排序,请给出任务序号。但是正如在其他答案中指出的那样,迭代器对您来说效果不佳,因为通常不会并行调用 next() 。那么您甚至需要迭代器还是只是为了处理任务?
我会为线程使用线程池,并使用 BlockingQueue
从池中获取数据。
这似乎适用于我的简单测试用例。
interface Action<F, T> {
public T process(F f);
}
class Mapper<F, T> implements Iterator<T> {
protected final Iterator<F> input;
protected final Action<F, T> action;
public Mapper(Iterator<F> input, Action<F, T> action) {
this.input = input;
this.action = action;
}
@Override
public boolean hasNext() {
return input.hasNext();
}
@Override
public T next() {
return action.process(input.next());
}
}
class ParallelMapper<F, T> extends Mapper<F, T> {
// The pool.
final ExecutorService pool;
// The queue.
final BlockingQueue<T> queue;
// The next one to deliver.
private T next = null;
public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
super(input, action);
// Start my pool.
pool = Executors.newFixedThreadPool(threads);
// And the queue.
queue = new ArrayBlockingQueue<>(queueLength);
}
class Worker implements Runnable {
final F f;
private T t;
public Worker(F f) {
this.f = f;
}
@Override
public void run() {
try {
queue.put(action.process(f));
} catch (InterruptedException ex) {
// Not sure what you can do here.
}
}
}
@Override
public boolean hasNext() {
// All done if delivered it and the input is empty and the queue is empty and the threads are finished.
while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
// First look in the queue.
next = queue.poll();
if (next == null) {
// Queue empty.
if (input.hasNext()) {
// Start a new worker.
pool.execute(new Worker(input.next()));
}
} else {
// Input exhausted - shut down the pool - unless we already have.
if (!pool.isShutdown()) {
pool.shutdown();
}
}
}
return next != null;
}
@Override
public T next() {
T n = next;
if (n != null) {
// Delivered that one.
next = null;
} else {
// Fails.
throw new NoSuchElementException();
}
return n;
}
}
public void test() {
List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
System.out.println("Data");
for (Integer i : Iterables.in(data)) {
System.out.println(i);
}
Action<Integer, Integer> action = new Action<Integer, Integer>() {
@Override
public Integer process(Integer f) {
try {
// Wait that many seconds.
Thread.sleep(1000L * f);
} catch (InterruptedException ex) {
// Just give up.
}
// Return it unchanged.
return f;
}
};
System.out.println("Processed");
for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
System.out.println(i);
}
System.out.println("Parallel Processed");
for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
System.out.println(i);
}
}
注意:Iterables.in(Iterator<T>)
只是创建一个 Iterable<T>
来封装传递的 Iterator<T>
。
对于您的顺序,您可以处理 Pair<Integer,F>
并使用 PriorityQueue
作为线程输出。然后您可以安排按顺序拉动它们。
好的,谢谢大家。这就是我所做的。
首先,我将 ItemMappingFunction 包装在 Callable 中:
private static class CallableAction<F extends Item, T extends Item>
implements Callable<T> {
private ItemMappingFunction<F, T> action;
private F input;
public CallableAction(ItemMappingFunction<F, T> action, F input) {
this.action = action;
this.input = input;
}
public T call() throws XPathException {
return action.mapItem(input);
}
}
我用标准迭代器 class 描述了我的问题,但实际上我使用的是我自己的 SequenceIterator 接口,它有一个 next() 方法,最后 returns null-顺序。
我根据 "ordinary" 映射迭代器声明 class,如下所示:
public class MultithreadedMapper<F extends Item, T extends Item> extends Mapper<F, T> {
private ExecutorService service;
private BlockingQueue<Future<T>> resultQueue =
new LinkedBlockingQueue<Future<T>>();
在初始化时,我创建服务并启动队列:
public MultithreadedMapper(SequenceIterator base, ItemMappingFunction<F, T> action) throws XPathException {
super(base, action);
int maxThreads = Runtime.getRuntime().availableProcessors();
maxThreads = maxThreads > 0 ? maxThreads : 1;
service = Executors.newFixedThreadPool(maxThreads);
// prime the queue
int n = 0;
while (n++ < maxThreads) {
F item = (F) base.next();
if (item == null) {
return;
}
mapOneItem(item);
}
}
其中 mapOneItem 是:
private void mapOneItem(F in) throws XPathException {
Future<T> future = service.submit(new CallableAction(action, in));
resultQueue.add(future);
}
当client请求下一个item时,我先将下一个input item提交给executor service,然后获取下一个output item,如果需要就等待它可用:
public T next() throws XPathException {
F nextIn = (F)base.next();
if (nextIn != null) {
mapOneItem(nextIn);
}
try {
Future<T> future = resultQueue.poll();
if (future == null) {
service.shutdown();
return null;
} else {
return future.get();
}
} catch (InterruptedException e) {
throw new XPathException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof XPathException) {
throw (XPathException)e.getCause();
}
throw new XPathException(e);
}
}