通用批处理的正确方法 Class

Right Approach for a General Purpose Batching Class

我正在寻找一个 class 可以让我添加要处理的项目,并且当项目计数等于批处理大小时执行一些操作。我会像这样使用它:

   Batcher<Token> batcher = new Batcher<Token>(500, Executors.newFixedThreadPool(4)) {
      public void onFlush(List<Token> tokens) {
          rest.notifyBatch(tokens);
      }
   };

   tokens.forEach((t)->batcher.add(t));
   batcher.awaitDone();

在#awaitDone 之后,我知道所有令牌都已收到通知。 #onFlush 可能会做任何事情,例如,我可能想批量插入到数据库中。我希望将#onFlush 调用放入执行器中。

我为此想出了一个解决方案,但它似乎有很多代码,所以我的问题是,有没有更好的方法来解决这个问题?除了我实施的 class 之外,还有其他现有的 class 或更好的实施方法吗?好像我的解决方案有很多动人的部分。

这是我想出的代码:

/**
 * Simple class to allow the batched processing of items and then to alternatively wait
 * for all batches to be completed.
 */
public abstract class Batcher<T> {

    private final int batchSize;
    private final ArrayBlockingQueue<T> batch;
    private final Executor executor;
    private final Phaser phaser = new Phaser(1);
    private final AtomicInteger processed = new AtomicInteger(0);

    public Batcher(int batchSize, Executor executor) {
        this.batchSize = batchSize;
        this.executor = executor;
        this.batch = new ArrayBlockingQueue<>(batchSize);
    }

    public void add(T item) {
        processed.incrementAndGet();
        while (!batch.offer(item)) {
            flush();
        }
    }

    public void addAll(Iterable<T> items) {
        for (T item : items) {
            add(item);
        }
    }

    public int getProcessedCount() {
        return processed.get();
    }

    public void flush() {
        if (batch.isEmpty())
            return;

        final List<T> batched = new ArrayList<>(batchSize);
        batch.drainTo(batched, batchSize);
        if (!batched.isEmpty())
            executor.execute(new PhasedRunnable(batched));
    }

    public abstract void onFlush(List<T> batch);

    public void awaitDone() {
        flush();
        phaser.arriveAndAwaitAdvance();
    }

    public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
        flush();
        try {
            phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private class PhasedRunnable implements Runnable {
        private final List<T> batch;

        private PhasedRunnable(List<T> batch) {
            this.batch = batch;
            phaser.register();
        }

        @Override
        public void run() {
            try {
                onFlush(batch);
            }
            finally {
                phaser.arrive();
            }
        }
    }
}

一个Java 8 的解决方案会很棒。谢谢

令我吃惊的是,您的代码无法使用多个线程将项目添加到单个 Batcher 实例。如果我们将这种限制转化为指定的用例,就不需要在内部使用专门的并发类。所以我们可以累加成一个普通的ArrayList,当容量用完时,用这个列表交换一个新的列表,而不需要复制项目。这允许将代码简化为

public class Batcher<T> implements Consumer<T> {

    private final int batchSize;
    private final Executor executor;
    private final Consumer<List<T>> actualAction;
    private final Phaser phaser = new Phaser(1);
    private ArrayList<T> batch;
    private int processed;

    public Batcher(int batchSize, Executor executor, Consumer<List<T>> c) {
        this.batchSize = batchSize;
        this.executor = executor;
        this.actualAction = c;
        this.batch = new ArrayList<>(batchSize);
    }

    public void accept(T item) {
        processed++;
        if(batch.size()==batchSize) flush();
        batch.add(item);
    }

    public int getProcessedCount() {
        return processed;
    }

    public void flush() {
        List<T> current = batch;
        if (batch.isEmpty())
            return;
        batch = new ArrayList<>(batchSize);
        phaser.register();
        executor.execute(() -> {
            try {
                actualAction.accept(current);
            }
            finally {
                phaser.arrive();
            }
        });
    }

    public void awaitDone() {
        flush();
        phaser.arriveAndAwaitAdvance();
    }

    public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
        flush();
        try {
            phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

关于 Java 8 个具体改进,它使用 Consumer 允许通过 lambda 表达式指定最终操作,而无需子类化 Batcher。此外,PhasedRunnable 被替换为 lambda 表达式。作为另一个简化,Batcher<T> implements Consumer<T> 消除了对方法 addAll 的需要,因为每个 Iterable 都支持 forEach(Consumer<? super T>).

所以用例现在看起来像:

Batcher<Token> batcher = new Batcher<>(
    500, Executors.newFixedThreadPool(4), currTokens -> rest.notifyBatch(currTokens));

tokens.forEach(batcher);
batcher.awaitDone();