通用批处理的正确方法 Class
Right Approach for a General Purpose Batching Class
我正在寻找一个 class 可以让我添加要处理的项目,并且当项目计数等于批处理大小时执行一些操作。我会像这样使用它:
Batcher<Token> batcher = new Batcher<Token>(500, Executors.newFixedThreadPool(4)) {
public void onFlush(List<Token> tokens) {
rest.notifyBatch(tokens);
}
};
tokens.forEach((t)->batcher.add(t));
batcher.awaitDone();
在#awaitDone 之后,我知道所有令牌都已收到通知。 #onFlush 可能会做任何事情,例如,我可能想批量插入到数据库中。我希望将#onFlush 调用放入执行器中。
我为此想出了一个解决方案,但它似乎有很多代码,所以我的问题是,有没有更好的方法来解决这个问题?除了我实施的 class 之外,还有其他现有的 class 或更好的实施方法吗?好像我的解决方案有很多动人的部分。
这是我想出的代码:
/**
* Simple class to allow the batched processing of items and then to alternatively wait
* for all batches to be completed.
*/
public abstract class Batcher<T> {
private final int batchSize;
private final ArrayBlockingQueue<T> batch;
private final Executor executor;
private final Phaser phaser = new Phaser(1);
private final AtomicInteger processed = new AtomicInteger(0);
public Batcher(int batchSize, Executor executor) {
this.batchSize = batchSize;
this.executor = executor;
this.batch = new ArrayBlockingQueue<>(batchSize);
}
public void add(T item) {
processed.incrementAndGet();
while (!batch.offer(item)) {
flush();
}
}
public void addAll(Iterable<T> items) {
for (T item : items) {
add(item);
}
}
public int getProcessedCount() {
return processed.get();
}
public void flush() {
if (batch.isEmpty())
return;
final List<T> batched = new ArrayList<>(batchSize);
batch.drainTo(batched, batchSize);
if (!batched.isEmpty())
executor.execute(new PhasedRunnable(batched));
}
public abstract void onFlush(List<T> batch);
public void awaitDone() {
flush();
phaser.arriveAndAwaitAdvance();
}
public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
flush();
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private class PhasedRunnable implements Runnable {
private final List<T> batch;
private PhasedRunnable(List<T> batch) {
this.batch = batch;
phaser.register();
}
@Override
public void run() {
try {
onFlush(batch);
}
finally {
phaser.arrive();
}
}
}
}
一个Java 8 的解决方案会很棒。谢谢
令我吃惊的是,您的代码无法使用多个线程将项目添加到单个 Batcher
实例。如果我们将这种限制转化为指定的用例,就不需要在内部使用专门的并发类。所以我们可以累加成一个普通的ArrayList
,当容量用完时,用这个列表交换一个新的列表,而不需要复制项目。这允许将代码简化为
public class Batcher<T> implements Consumer<T> {
private final int batchSize;
private final Executor executor;
private final Consumer<List<T>> actualAction;
private final Phaser phaser = new Phaser(1);
private ArrayList<T> batch;
private int processed;
public Batcher(int batchSize, Executor executor, Consumer<List<T>> c) {
this.batchSize = batchSize;
this.executor = executor;
this.actualAction = c;
this.batch = new ArrayList<>(batchSize);
}
public void accept(T item) {
processed++;
if(batch.size()==batchSize) flush();
batch.add(item);
}
public int getProcessedCount() {
return processed;
}
public void flush() {
List<T> current = batch;
if (batch.isEmpty())
return;
batch = new ArrayList<>(batchSize);
phaser.register();
executor.execute(() -> {
try {
actualAction.accept(current);
}
finally {
phaser.arrive();
}
});
}
public void awaitDone() {
flush();
phaser.arriveAndAwaitAdvance();
}
public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
flush();
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
关于 Java 8 个具体改进,它使用 Consumer
允许通过 lambda 表达式指定最终操作,而无需子类化 Batcher
。此外,PhasedRunnable
被替换为 lambda 表达式。作为另一个简化,Batcher<T> implements Consumer<T>
消除了对方法 addAll
的需要,因为每个 Iterable
都支持 forEach(Consumer<? super T>)
.
所以用例现在看起来像:
Batcher<Token> batcher = new Batcher<>(
500, Executors.newFixedThreadPool(4), currTokens -> rest.notifyBatch(currTokens));
tokens.forEach(batcher);
batcher.awaitDone();
我正在寻找一个 class 可以让我添加要处理的项目,并且当项目计数等于批处理大小时执行一些操作。我会像这样使用它:
Batcher<Token> batcher = new Batcher<Token>(500, Executors.newFixedThreadPool(4)) {
public void onFlush(List<Token> tokens) {
rest.notifyBatch(tokens);
}
};
tokens.forEach((t)->batcher.add(t));
batcher.awaitDone();
在#awaitDone 之后,我知道所有令牌都已收到通知。 #onFlush 可能会做任何事情,例如,我可能想批量插入到数据库中。我希望将#onFlush 调用放入执行器中。
我为此想出了一个解决方案,但它似乎有很多代码,所以我的问题是,有没有更好的方法来解决这个问题?除了我实施的 class 之外,还有其他现有的 class 或更好的实施方法吗?好像我的解决方案有很多动人的部分。
这是我想出的代码:
/**
* Simple class to allow the batched processing of items and then to alternatively wait
* for all batches to be completed.
*/
public abstract class Batcher<T> {
private final int batchSize;
private final ArrayBlockingQueue<T> batch;
private final Executor executor;
private final Phaser phaser = new Phaser(1);
private final AtomicInteger processed = new AtomicInteger(0);
public Batcher(int batchSize, Executor executor) {
this.batchSize = batchSize;
this.executor = executor;
this.batch = new ArrayBlockingQueue<>(batchSize);
}
public void add(T item) {
processed.incrementAndGet();
while (!batch.offer(item)) {
flush();
}
}
public void addAll(Iterable<T> items) {
for (T item : items) {
add(item);
}
}
public int getProcessedCount() {
return processed.get();
}
public void flush() {
if (batch.isEmpty())
return;
final List<T> batched = new ArrayList<>(batchSize);
batch.drainTo(batched, batchSize);
if (!batched.isEmpty())
executor.execute(new PhasedRunnable(batched));
}
public abstract void onFlush(List<T> batch);
public void awaitDone() {
flush();
phaser.arriveAndAwaitAdvance();
}
public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
flush();
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private class PhasedRunnable implements Runnable {
private final List<T> batch;
private PhasedRunnable(List<T> batch) {
this.batch = batch;
phaser.register();
}
@Override
public void run() {
try {
onFlush(batch);
}
finally {
phaser.arrive();
}
}
}
}
一个Java 8 的解决方案会很棒。谢谢
令我吃惊的是,您的代码无法使用多个线程将项目添加到单个 Batcher
实例。如果我们将这种限制转化为指定的用例,就不需要在内部使用专门的并发类。所以我们可以累加成一个普通的ArrayList
,当容量用完时,用这个列表交换一个新的列表,而不需要复制项目。这允许将代码简化为
public class Batcher<T> implements Consumer<T> {
private final int batchSize;
private final Executor executor;
private final Consumer<List<T>> actualAction;
private final Phaser phaser = new Phaser(1);
private ArrayList<T> batch;
private int processed;
public Batcher(int batchSize, Executor executor, Consumer<List<T>> c) {
this.batchSize = batchSize;
this.executor = executor;
this.actualAction = c;
this.batch = new ArrayList<>(batchSize);
}
public void accept(T item) {
processed++;
if(batch.size()==batchSize) flush();
batch.add(item);
}
public int getProcessedCount() {
return processed;
}
public void flush() {
List<T> current = batch;
if (batch.isEmpty())
return;
batch = new ArrayList<>(batchSize);
phaser.register();
executor.execute(() -> {
try {
actualAction.accept(current);
}
finally {
phaser.arrive();
}
});
}
public void awaitDone() {
flush();
phaser.arriveAndAwaitAdvance();
}
public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
flush();
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
关于 Java 8 个具体改进,它使用 Consumer
允许通过 lambda 表达式指定最终操作,而无需子类化 Batcher
。此外,PhasedRunnable
被替换为 lambda 表达式。作为另一个简化,Batcher<T> implements Consumer<T>
消除了对方法 addAll
的需要,因为每个 Iterable
都支持 forEach(Consumer<? super T>)
.
所以用例现在看起来像:
Batcher<Token> batcher = new Batcher<>(
500, Executors.newFixedThreadPool(4), currTokens -> rest.notifyBatch(currTokens));
tokens.forEach(batcher);
batcher.awaitDone();