Spring TaskExecutor无界队列

Spring TaskExecutor unbounded queue

我实现了 Spring-TaskExecutor(相当于 JDK 1.5 的执行器。)来处理从外部系统接收的通知通知。

接口只有一种方法:

 public interface AsynchronousService {
    void executeAsynchronously(Runnable task);
}

以及相应的实现:

public class AsynchronousServiceImpl implements AsynchronousService {

    private TaskExecutor taskExecutor;

    @Override
    public void executeAsynchronously(Runnable task) {
        taskExecutor.execute(task);
    }

    @Required
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

Xml-任务执行器的配置(遗留应用):

<bean id="taskExecutor" class="org.example.impl.NotificationPool">
        <property name="corePoolSize" value="1"/>
        <property name="maxPoolSize" value="1"/>
        <!--<property name="queueCapacity" value="100"/>-->
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

为 corePoolSize 和 maxPoolSize 都设置了 1,因为我希望任务按顺序执行(处理任务的池只创建 1 个线程)。

我想根据收到通知的日期对我的任务进行排序,因此我需要覆盖此功能以允许优先排序:

public class NotificationPool extends ThreadPoolTaskExecutor {
     @Override
     protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
          return new PriorityBlockingQueue<>(queueCapacity);
        }
    }

这是通知任务class:

public class NotificationTask implements Runnable, Comparable<NotificationTask> {

    private final NotificationService notificationService;
    private final Notification notification;

    public NotificationService(NotificationService notificationService, 
                               Notification notification) {
        this.notificationService = notificationService;
        this.notification = notification;
    }

    @Override
    public int compareTo(NotificationTask task) {
        return notification.getTimestamp().compareTo(task.getTimestamp());
    }

    @Override
    public void run() {
        notificationService.processNotification(notification);
    }
}

这就是我执行它的方式:

asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));

这适用于有界队列,但我需要无界队列。 正如您在 xml-configuration 中看到的,定义队列容量的行被注释掉了:

<!--<property name="queueCapacity" value="100"/>-->

但是,如果我这样做,我会得到一个 OutOfMemoryException。它似乎试图在应用程序开始时创建一个无界队列。然而,Executor-Service 允许我们使用无界队列 - 但我不知道如何在这里做到这一点。

根据 java 文档:

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError)

所以基本上,它是无界的,你不需要关心队列大小。

但是,队列由数组支持:

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    // Here is the initialization of backed array
    this.queue = new Object[initialCapacity];
}

因此您需要提供合理的初始队列大小。之后,你就完成了。

但是,如上所述,随着越来越多的元素到来,如果内部数组已满,队列在尝试增长内部数组时可能会抛出 OutOfMemory 异常。

private void tryGrow(Object[] array, int oldCap)

但这不太可能,除非你的项目在短时间内产生数百万条通知