确保所有任务完成的同步对象
Synchronisation object to ensure all tasks are completed
我应该使用哪个 Java 同步对象来确保完成任意数量的任务?约束是:
- 每个任务都需要很长时间才能完成,并行执行任务是合适的。
- 太多任务无法放入内存(即我无法将每个任务的
Future
放入 Collection
,然后对所有未来调用 get
)。
- 我不知道会有多少任务(即我不能使用
CountDownLatch
)。
ExecutorService
可能是共享的所以我不能使用awaitTermination( long, TimeUnit )
例如,对于 Grand Central Dispatch,我可能会这样做:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
它产生如下所示的输出(对于 128 个项目):Processed 128 items in 1.846794962883 seconds.
我用 Phaser
尝试了类似的东西:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
任务并不总是在最后一个 print 语句之前完成,我可能会得到如下所示的输出(对于 128 个项目):Processed 121 items in 5.296 seconds.
Phaser
是否是正确的对象?文档表明它仅支持 65,535 个参与方,因此我需要对要处理的项目进行批处理或引入某种 Phaser
分层。
此示例中 Phaser
用法的问题在于 CallerRunsPolicy
允许任务在启动线程上执行。因此,当循环仍在进行时,到达方的数量可以等于注册方的数量,从而导致阶段增加。解决方案是用1方初始化Phaser
然后,当循环完成时,到达并等待其他方到达。这确保在所有任务完成之前阶段不会增加到 1。
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 1 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
final int arrivalPhase = latch.arrive();
}
};
executor.execute( task );
}
latch.arriveAndAwaitAdvance();
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
万一你在 java8 你可以使用 CompletableFuture
java.util.concurrent.CompletableFuture.allOf(CompletableFuture<?>... cfs)
这将等待传递数组中所有期货的结果。
"to ensure an arbitrarily large number of tasks are completed" - 最简单的方法是维护已完成任务的计数器,阻塞操作等待达到给定数量的任务。没有现成的class,但很容易制作:
class EventCounter {
long counter=0;
synchronized void up () {
counter++;
notifyAll();
}
synchronized void ensure (long count) {
while (counter<count) wait();
}
}
"There are too many tasks to fit into memory" - 所以当运行任务数量过多时,必须暂停提交新任务的过程。最简单的方法就是把运行个任务的数量看成是一个资源,用一个信号量来统计:
Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
EventCounter eventCounter =new EventCounter ();
for( final String item : fetchItems() ) {
final Runnable task = new Runnable() {
public void run() {
processItem( item );
runningTasksSema.release();
eventCounter.up();
}
};
runningTasksSema.aquire();
executor.execute(task);
}
当一个线程想要确保完成一些给定数量的任务时,它会调用:
eventCounter.ensure(givenNumberOfFinishedTasks);
可以设计 runningTasksSema.aquire()
和 eventCounter.ensure()
操作的异步(非阻塞)版本,但它们会更复杂。
我应该使用哪个 Java 同步对象来确保完成任意数量的任务?约束是:
- 每个任务都需要很长时间才能完成,并行执行任务是合适的。
- 太多任务无法放入内存(即我无法将每个任务的
Future
放入Collection
,然后对所有未来调用get
)。 - 我不知道会有多少任务(即我不能使用
CountDownLatch
)。 ExecutorService
可能是共享的所以我不能使用awaitTermination( long, TimeUnit )
例如,对于 Grand Central Dispatch,我可能会这样做:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
它产生如下所示的输出(对于 128 个项目):Processed 128 items in 1.846794962883 seconds.
我用 Phaser
尝试了类似的东西:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
任务并不总是在最后一个 print 语句之前完成,我可能会得到如下所示的输出(对于 128 个项目):Processed 121 items in 5.296 seconds.
Phaser
是否是正确的对象?文档表明它仅支持 65,535 个参与方,因此我需要对要处理的项目进行批处理或引入某种 Phaser
分层。
此示例中 Phaser
用法的问题在于 CallerRunsPolicy
允许任务在启动线程上执行。因此,当循环仍在进行时,到达方的数量可以等于注册方的数量,从而导致阶段增加。解决方案是用1方初始化Phaser
然后,当循环完成时,到达并等待其他方到达。这确保在所有任务完成之前阶段不会增加到 1。
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 1 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
final int arrivalPhase = latch.arrive();
}
};
executor.execute( task );
}
latch.arriveAndAwaitAdvance();
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
万一你在 java8 你可以使用 CompletableFuture
java.util.concurrent.CompletableFuture.allOf(CompletableFuture<?>... cfs)
这将等待传递数组中所有期货的结果。
"to ensure an arbitrarily large number of tasks are completed" - 最简单的方法是维护已完成任务的计数器,阻塞操作等待达到给定数量的任务。没有现成的class,但很容易制作:
class EventCounter {
long counter=0;
synchronized void up () {
counter++;
notifyAll();
}
synchronized void ensure (long count) {
while (counter<count) wait();
}
}
"There are too many tasks to fit into memory" - 所以当运行任务数量过多时,必须暂停提交新任务的过程。最简单的方法就是把运行个任务的数量看成是一个资源,用一个信号量来统计:
Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
EventCounter eventCounter =new EventCounter ();
for( final String item : fetchItems() ) {
final Runnable task = new Runnable() {
public void run() {
processItem( item );
runningTasksSema.release();
eventCounter.up();
}
};
runningTasksSema.aquire();
executor.execute(task);
}
当一个线程想要确保完成一些给定数量的任务时,它会调用:
eventCounter.ensure(givenNumberOfFinishedTasks);
可以设计 runningTasksSema.aquire()
和 eventCounter.ensure()
操作的异步(非阻塞)版本,但它们会更复杂。