异步单任务执行器
Asynchronous single task executor
我怀疑下面的解决方案是否适用于以下情况:
- 在多用户应用程序中,任何用户都可以通过单击按钮开始处理给定数据
- 处理需要很长时间,因此应该异步执行以不阻塞 GUI
- 如果一个用户已经开始处理,其他请求应该被拒绝直到完成
下面是用于解决这个问题的代码:
public class Processor {
private final ExecutorService execService = Executors.newSingleThreadExecutor();
private final Object monitor = new Object();
private AtomicReference<Task> runningTask = new AtomicReference<>(null);
public Optional<CompletableFuture<String>> processDataAsync(String data) {
if (runningTask.get() != null)
return Optional.empty(); //rejecting data process request because another request is already being served
synchronized (monitor) {
if (runningTask.get() != null)
return Optional.empty();
CompletableFuture<String> f = new CompletableFuture<>();
f.whenComplete((r, e) -> runningTask.set(null)); //when processing completes, another data process request can be accepted
Task task = new Task(f, data);
runningTask.set(task);
execService.submit(task);
return Optional.of(f);
}
}
}
Task
是 Runnable
如下:
public class Task implements Runnable {
private final CompletableFuture<String> result;
private final String data;
public Task(CompletableFuture<String> result, String data) {
this.result = result;
this.data = data;
}
@Override
public void run() {
String processingResult = processData(data); //does some blocking stuff with data, returning result of processing
result.complete(processingResult);
}
}
这里让我感到困惑的是 processDataAsync
中的同步(即阻塞)。我知道这里的阻塞很短而且并不重要,但是异步方法不应该总是在没有阻塞的情况下实现吗?如果是这样,我无法想象没有同步如何实现“单一处理”。
我认为您可以尝试使用从 Executors.newSingleThreadExecutor()
返回的 ExecutorService
的功能。在上述方法的 JavaDoc 中解释说:
/**
* Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {/*...*/}
当您将 Runnable
提交到方法返回的 ExecutorService
时,它会被放入队列并仅在前一个完成后执行。我想这就是你需要的。
也许我误解了问题,但你似乎把情况复杂化了。与其跟踪任务,不如跟踪 ExecutorService#submit
返回的 Future
。 Future
对象是您返回到正在执行的任务的系绳。
为Future
定义一个成员字段。
Future future ;
在请求处理时测试 Future
。调用 Future#isDone
进行测试。 Javadoc 说:
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
if( Objects.isNull( this.future ) || this.future.isDone() ) {
… proceed with request to process data.
this.future = executorService.submit( … ) ;
return Optional.of( this.future ) ;
} else {
… Refuse the request to process data.
… Do *not* submit any task to the executor service.
return Optional.empty() ;
}
TaskMaster
解决方案
在各种评论中,您提供了有关问题的更多详细信息。
您想从多个线程向单个对象提交任务。为了清楚起见,我们称该对象为 TaskMaster
。 TaskMaster
实例跟踪其嵌套的执行程序服务当前是否正在处理任务。
- 如果忙于处理任务,则拒绝任何传入的任务。该拒绝采用
Optional< Future >
. 的形式
- 如果当前未处理任务,则接受招标任务,并将其分配给执行程序服务以立即执行。此接受采用已加载
Optional< Future >
. 的形式
由于上面讨论的代码将被跨线程访问,我们必须以thread-safe的方式保护Future future ;
。一种简单的方法是在将任务提交给 TaskMaster
.
的唯一方法上标记 synchronized
package singletask;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
// Runs one task at a time, rejecting any task tendered while already executing a task.
public class TaskMaster
{
// Member fields.
private final ExecutorService executorService;
private Future future;
// Constructor
public TaskMaster ( )
{
this.executorService = Executors.newSingleThreadExecutor();
}
public synchronized Optional < Future > tender ( Runnable task )
{
if ( Objects.isNull( this.future ) || this.future.isDone() )
{
// Proceed with immediate execution of the tendered task.
this.future = executorService.submit( task );
return Optional.of( this.future );
} else
{
// Already busy on a task. Reject this tendered task.
return Optional.empty();
}
}
public void shutdownAndAwaitTerminationOfExecutorService ( )
{
if ( Objects.isNull( this.executorService ) ) { return; }
this.executorService.shutdown(); // Stop new tasks from being submitted.
try
{
// Wait a while for existing tasks to terminate
if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
{
this.executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
System.err.println( "Pool did not terminate." );
}
}
catch ( InterruptedException ex )
{
// (Re-)Cancel if current thread also interrupted
this.executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
用法如下所示。 注意: 多线程调用 System.out.println
do not 总是按时间顺序出现在控制台上。始终包含并检查时间戳以验证订单。
package singletask;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
Runnable task = ( ) -> {
UUID taskID = UUID.randomUUID();
System.out.println( "Starting task " + taskID + " at " + Instant.now() );
// Pretend to do some long hard work, by sleeping.
try { Thread.sleep( Duration.ofSeconds( 5 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
System.out.println( "Ending task " + taskID + " at " + Instant.now() );
};
TaskMaster taskMaster = new TaskMaster();
Optional < Future > f1 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
try { Thread.sleep( Duration.ofMillis( 500 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
System.out.println( "f1 = " + f1 );
Optional < Future > f2 = taskMaster.tender( task ); // We expect rejection, showing `Optional.empty`.
System.out.println( "f2 = " + f2 );
try { Thread.sleep( Duration.ofSeconds( 7 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
Optional < Future > f3 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
System.out.println( "f3 = " + f3 );
// Attempt a graceful shutwdown.
taskMaster.shutdownAndAwaitTerminationOfExecutorService();
System.out.println( "Demo ending at " + Instant.now() );
}
}
当运行.
Starting task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:42:58.516852Z
f1 = Optional[java.util.concurrent.FutureTask@1fb3ebeb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31befd9f[Wrapped task = singletask.App$$Lambda/0x0000000800c01208@1c20c684]]]
f2 = Optional.empty
Ending task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:43:03.530138Z
Starting task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:06.011485Z
f3 = Optional[java.util.concurrent.FutureTask@816f27d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@1218025c[Wrapped task = singletask.App$$Lambda/0x0000000800c01208@1c20c684]]]
Ending task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:11.013576Z
Demo ending at 2022-02-03T06:43:11.014180Z
自定义ExecutorService
虽然上面的 TaskMaster
代码有效,并提供了您要求的 Optional
对象,但我会推荐另一种方法。
您可以制作自己的 ExecutorService
版本。您的实施可以做一些类似于我们上面看到的事情,跟踪单个任务的执行。
与其返回 Optional< Future >
,更正统的方法是提供一个 submit
方法实现:
- Returns a
Future
如果招标任务可以立即执行,或者...
- 抛出
RejectedExecutionException
因为任务已经 运行ning。
此行为在 ExecutorService
的 Javadoc 中定义。您对该自定义执行程序服务的招标任务的任何方法都会捕获此异常,而不是检查 Optional
.
换句话说,修改您评论的摘录:
If two users simultaneously try to request data processing, only one of them will succeed and receive a Future
, and another will see an exception thrown, indicating that the request was rejected.
有了这个自定义执行器服务,调用的程序员需要学习的东西就更少了。调用程序员不需要理解 TaskMaster
class 的语义,他们只需要理解常见的 ExecutorService
行为。
提示:AbstractExecutorService
class 可能会让您更轻松地创建自己的执行程序服务。
我怀疑下面的解决方案是否适用于以下情况:
- 在多用户应用程序中,任何用户都可以通过单击按钮开始处理给定数据
- 处理需要很长时间,因此应该异步执行以不阻塞 GUI
- 如果一个用户已经开始处理,其他请求应该被拒绝直到完成
下面是用于解决这个问题的代码:
public class Processor {
private final ExecutorService execService = Executors.newSingleThreadExecutor();
private final Object monitor = new Object();
private AtomicReference<Task> runningTask = new AtomicReference<>(null);
public Optional<CompletableFuture<String>> processDataAsync(String data) {
if (runningTask.get() != null)
return Optional.empty(); //rejecting data process request because another request is already being served
synchronized (monitor) {
if (runningTask.get() != null)
return Optional.empty();
CompletableFuture<String> f = new CompletableFuture<>();
f.whenComplete((r, e) -> runningTask.set(null)); //when processing completes, another data process request can be accepted
Task task = new Task(f, data);
runningTask.set(task);
execService.submit(task);
return Optional.of(f);
}
}
}
Task
是 Runnable
如下:
public class Task implements Runnable {
private final CompletableFuture<String> result;
private final String data;
public Task(CompletableFuture<String> result, String data) {
this.result = result;
this.data = data;
}
@Override
public void run() {
String processingResult = processData(data); //does some blocking stuff with data, returning result of processing
result.complete(processingResult);
}
}
这里让我感到困惑的是 processDataAsync
中的同步(即阻塞)。我知道这里的阻塞很短而且并不重要,但是异步方法不应该总是在没有阻塞的情况下实现吗?如果是这样,我无法想象没有同步如何实现“单一处理”。
我认为您可以尝试使用从 Executors.newSingleThreadExecutor()
返回的 ExecutorService
的功能。在上述方法的 JavaDoc 中解释说:
/**
* Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {/*...*/}
当您将 Runnable
提交到方法返回的 ExecutorService
时,它会被放入队列并仅在前一个完成后执行。我想这就是你需要的。
也许我误解了问题,但你似乎把情况复杂化了。与其跟踪任务,不如跟踪 ExecutorService#submit
返回的 Future
。 Future
对象是您返回到正在执行的任务的系绳。
为Future
定义一个成员字段。
Future future ;
在请求处理时测试 Future
。调用 Future#isDone
进行测试。 Javadoc 说:
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
if( Objects.isNull( this.future ) || this.future.isDone() ) {
… proceed with request to process data.
this.future = executorService.submit( … ) ;
return Optional.of( this.future ) ;
} else {
… Refuse the request to process data.
… Do *not* submit any task to the executor service.
return Optional.empty() ;
}
TaskMaster
解决方案
在各种评论中,您提供了有关问题的更多详细信息。
您想从多个线程向单个对象提交任务。为了清楚起见,我们称该对象为 TaskMaster
。 TaskMaster
实例跟踪其嵌套的执行程序服务当前是否正在处理任务。
- 如果忙于处理任务,则拒绝任何传入的任务。该拒绝采用
Optional< Future >
. 的形式
- 如果当前未处理任务,则接受招标任务,并将其分配给执行程序服务以立即执行。此接受采用已加载
Optional< Future >
. 的形式
由于上面讨论的代码将被跨线程访问,我们必须以thread-safe的方式保护Future future ;
。一种简单的方法是在将任务提交给 TaskMaster
.
synchronized
package singletask;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
// Runs one task at a time, rejecting any task tendered while already executing a task.
public class TaskMaster
{
// Member fields.
private final ExecutorService executorService;
private Future future;
// Constructor
public TaskMaster ( )
{
this.executorService = Executors.newSingleThreadExecutor();
}
public synchronized Optional < Future > tender ( Runnable task )
{
if ( Objects.isNull( this.future ) || this.future.isDone() )
{
// Proceed with immediate execution of the tendered task.
this.future = executorService.submit( task );
return Optional.of( this.future );
} else
{
// Already busy on a task. Reject this tendered task.
return Optional.empty();
}
}
public void shutdownAndAwaitTerminationOfExecutorService ( )
{
if ( Objects.isNull( this.executorService ) ) { return; }
this.executorService.shutdown(); // Stop new tasks from being submitted.
try
{
// Wait a while for existing tasks to terminate
if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
{
this.executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
System.err.println( "Pool did not terminate." );
}
}
catch ( InterruptedException ex )
{
// (Re-)Cancel if current thread also interrupted
this.executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
用法如下所示。 注意: 多线程调用 System.out.println
do not 总是按时间顺序出现在控制台上。始终包含并检查时间戳以验证订单。
package singletask;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
Runnable task = ( ) -> {
UUID taskID = UUID.randomUUID();
System.out.println( "Starting task " + taskID + " at " + Instant.now() );
// Pretend to do some long hard work, by sleeping.
try { Thread.sleep( Duration.ofSeconds( 5 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
System.out.println( "Ending task " + taskID + " at " + Instant.now() );
};
TaskMaster taskMaster = new TaskMaster();
Optional < Future > f1 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
try { Thread.sleep( Duration.ofMillis( 500 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
System.out.println( "f1 = " + f1 );
Optional < Future > f2 = taskMaster.tender( task ); // We expect rejection, showing `Optional.empty`.
System.out.println( "f2 = " + f2 );
try { Thread.sleep( Duration.ofSeconds( 7 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
Optional < Future > f3 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
System.out.println( "f3 = " + f3 );
// Attempt a graceful shutwdown.
taskMaster.shutdownAndAwaitTerminationOfExecutorService();
System.out.println( "Demo ending at " + Instant.now() );
}
}
当运行.
Starting task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:42:58.516852Z
f1 = Optional[java.util.concurrent.FutureTask@1fb3ebeb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31befd9f[Wrapped task = singletask.App$$Lambda/0x0000000800c01208@1c20c684]]]
f2 = Optional.empty
Ending task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:43:03.530138Z
Starting task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:06.011485Z
f3 = Optional[java.util.concurrent.FutureTask@816f27d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@1218025c[Wrapped task = singletask.App$$Lambda/0x0000000800c01208@1c20c684]]]
Ending task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:11.013576Z
Demo ending at 2022-02-03T06:43:11.014180Z
自定义ExecutorService
虽然上面的 TaskMaster
代码有效,并提供了您要求的 Optional
对象,但我会推荐另一种方法。
您可以制作自己的 ExecutorService
版本。您的实施可以做一些类似于我们上面看到的事情,跟踪单个任务的执行。
与其返回 Optional< Future >
,更正统的方法是提供一个 submit
方法实现:
- Returns a
Future
如果招标任务可以立即执行,或者... - 抛出
RejectedExecutionException
因为任务已经 运行ning。
此行为在 ExecutorService
的 Javadoc 中定义。您对该自定义执行程序服务的招标任务的任何方法都会捕获此异常,而不是检查 Optional
.
换句话说,修改您评论的摘录:
If two users simultaneously try to request data processing, only one of them will succeed and receive a
Future
, and another will see an exception thrown, indicating that the request was rejected.
有了这个自定义执行器服务,调用的程序员需要学习的东西就更少了。调用程序员不需要理解 TaskMaster
class 的语义,他们只需要理解常见的 ExecutorService
行为。
提示:AbstractExecutorService
class 可能会让您更轻松地创建自己的执行程序服务。