异步单任务执行器

Asynchronous single task executor

我怀疑下面的解决方案是否适用于以下情况:

下面是用于解决这个问题的代码:

    public class Processor {
        private final ExecutorService execService = Executors.newSingleThreadExecutor();
        private final Object monitor = new Object();
        private AtomicReference<Task> runningTask = new AtomicReference<>(null);
        
        public Optional<CompletableFuture<String>> processDataAsync(String data) {
            if (runningTask.get() != null)
                return Optional.empty();  //rejecting data process request because another request is already being served
            
            synchronized (monitor) {
                if (runningTask.get() != null)
                    return Optional.empty();
                
                CompletableFuture<String> f = new CompletableFuture<>();
                f.whenComplete((r, e) -> runningTask.set(null));  //when processing completes, another data process request can be accepted
                
                Task task = new Task(f, data);
                runningTask.set(task);
                execService.submit(task);
                return Optional.of(f);
            }
        }
    }   

TaskRunnable 如下:

    public class Task implements Runnable {
        private final CompletableFuture<String> result;
        private final String data;
        
        public Task(CompletableFuture<String> result, String data) {
            this.result = result;
            this.data = data;
        }
        
        @Override
        public void run() {
            String processingResult = processData(data);  //does some blocking stuff with data, returning result of processing
            result.complete(processingResult);
        }
    }

这里让我感到困惑的是 processDataAsync 中的同步(即阻塞)。我知道这里的阻塞很短而且并不重要,但是异步方法不应该总是在没有阻塞的情况下实现吗?如果是这样,我无法想象没有同步如何实现“单一处理”。

我认为您可以尝试使用从 Executors.newSingleThreadExecutor() 返回的 ExecutorService 的功能。在上述方法的 JavaDoc 中解释说:

/**
 * Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {/*...*/}

当您将 Runnable 提交到方法返回的 ExecutorService 时,它会被放入队列并仅在前一个完成后执行。我想这就是你需要的。

也许我误解了问题,但你似乎把情况复杂化了。与其跟踪任务,不如跟踪 ExecutorService#submit 返回的 FutureFuture 对象是您返回到正在执行的任务的系绳。

Future定义一个成员字段。

Future future ;

在请求处理时测试 Future。调用 Future#isDone 进行测试。 Javadoc 说:

Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

if( Objects.isNull( this.future ) || this.future.isDone() ) {
    … proceed with request to process data.
    this.future = executorService.submit( … ) ;
    return Optional.of( this.future ) ;
} else {
    … Refuse the request to process data.
    … Do *not* submit any task to the executor service. 
    return Optional.empty() ;
}

TaskMaster解决方案

在各种评论中,您提供了有关问题的更多详细信息。

您想从多个线程向单个对象提交任务。为了清楚起见,我们称该对象为 TaskMasterTaskMaster 实例跟踪其嵌套的执行程序服务当前是否正在处理任务。

  • 如果忙于处理任务,则拒绝任何传入的任务。该拒绝采用 Optional< Future >.
  • 的形式
  • 如果当前未处理任务,则接受招标任务,并将其分配给执行程序服务以立即执行。此接受采用已加载 Optional< Future >.
  • 的形式

由于上面讨论的代码将被跨线程访问,我们必须以thread-safe的方式保护Future future ;。一种简单的方法是在将任务提交给 TaskMaster.

的唯一方法上标记 synchronized
package singletask;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

// Runs one task at a time, rejecting any task tendered while already executing a task.
public class TaskMaster
{
    // Member fields.
    private final ExecutorService executorService;
    private Future future;

    // Constructor
    public TaskMaster ( )
    {
        this.executorService = Executors.newSingleThreadExecutor();
    }

    public synchronized Optional < Future > tender ( Runnable task )
    {
        if ( Objects.isNull( this.future ) || this.future.isDone() )
        {
            // Proceed with immediate execution of the tendered task.
            this.future = executorService.submit( task );
            return Optional.of( this.future );
        } else
        {
            // Already busy on a task. Reject this tendered task.
            return Optional.empty();
        }
    }

    public void shutdownAndAwaitTerminationOfExecutorService ( )
    {
        if ( Objects.isNull( this.executorService ) ) { return; }
        this.executorService.shutdown(); // Stop new tasks from being submitted.
        try
        {
            // Wait a while for existing tasks to terminate
            if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
            {
                this.executorService.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
                    System.err.println( "Pool did not terminate." );
            }
        }
        catch ( InterruptedException ex )
        {
            // (Re-)Cancel if current thread also interrupted
            this.executorService.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}

用法如下所示。 注意: 多线程调用 System.out.println do not 总是按时间顺序出现在控制台上。始终包含并检查时间戳以验证订单。

package singletask;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        Runnable task = ( ) -> {
            UUID taskID = UUID.randomUUID();
            System.out.println( "Starting task " + taskID + " at " + Instant.now() );
            // Pretend to do some long hard work, by sleeping.
            try { Thread.sleep( Duration.ofSeconds( 5 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
            System.out.println( "Ending task " + taskID + " at " + Instant.now() );
        };

        TaskMaster taskMaster = new TaskMaster();

        Optional < Future > f1 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
        try { Thread.sleep( Duration.ofMillis( 500 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
        System.out.println( "f1 = " + f1 );

        Optional < Future > f2 = taskMaster.tender( task ); // We expect rejection, showing `Optional.empty`.
        System.out.println( "f2 = " + f2 );

        try { Thread.sleep( Duration.ofSeconds( 7 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
        Optional < Future > f3 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
        System.out.println( "f3 = " + f3 );

        // Attempt a graceful shutwdown.
        taskMaster.shutdownAndAwaitTerminationOfExecutorService();
        System.out.println( "Demo ending at " + Instant.now() );
    }
}

当运行.

Starting task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:42:58.516852Z
f1 = Optional[java.util.concurrent.FutureTask@1fb3ebeb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31befd9f[Wrapped task = singletask.App$$Lambda/0x0000000800c01208@1c20c684]]]
f2 = Optional.empty
Ending task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:43:03.530138Z
Starting task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:06.011485Z
f3 = Optional[java.util.concurrent.FutureTask@816f27d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@1218025c[Wrapped task = singletask.App$$Lambda/0x0000000800c01208@1c20c684]]]
Ending task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:11.013576Z
Demo ending at 2022-02-03T06:43:11.014180Z

自定义ExecutorService

虽然上面的 TaskMaster 代码有效,并提供了您要求的 Optional 对象,但我会推荐另一种方法。

您可以制作自己的 ExecutorService 版本。您的实施可以做一些类似于我们上面看到的事情,跟踪单个任务的执行。

与其返回 Optional< Future >,更正统的方法是提供一个 submit 方法实现:

  • Returns a Future 如果招标任务可以立即执行,或者...
  • 抛出 RejectedExecutionException 因为任务已经 运行ning。

此行为在 ExecutorService 的 Javadoc 中定义。您对该自定义执行程序服务的招标任务的任何方法都会捕获此异常,而不是检查 Optional.

换句话说,修改您评论的摘录:

If two users simultaneously try to request data processing, only one of them will succeed and receive a Future, and another will see an exception thrown, indicating that the request was rejected.

有了这个自定义执行器服务,调用的程序员需要学习的东西就更少了。调用程序员不需要理解 TaskMaster class 的语义,他们只需要理解常见的 ExecutorService 行为。

提示:AbstractExecutorService class 可能会让您更轻松地创建自己的执行程序服务。