Spring Executorservice 错误处理
Spring Executorservice Error-Handling
我实现了 Spring-TaskExecutor(相当于 JDK 1.5 的执行器。)来处理从外部系统接收的通知通知。
接口只有一种方法:
public interface AsynchronousService {
void executeAsynchronously(Runnable task);
}
以及相应的实现:
public class AsynchronousServiceImpl implements AsynchronousService {
private TaskExecutor taskExecutor;
@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}
@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
Xml-任务执行器的配置(遗留应用程序):
<bean id="taskExecutor" class="org.example.impl.NotificationPool">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<property name="queueCapacity" value="100"/>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
为 corePoolSize 和 maxPoolSize 都设置了 1,因为我希望任务按顺序执行(处理任务的池只创建 1 个线程)。
我想根据收到通知的日期对我的任务进行排序,因此我需要覆盖此函数以允许优先排序:
public class NotificationPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}
这是通知任务class:
public class NotificationTask implements Runnable, Comparable<NotificationTask> {
private final NotificationService notificationService;
private final Notification notification;
public NotificationService(NotificationService notificationService,
Notification notification) {
this.notificationService = notificationService;
this.notification = notification;
}
@Override
public int compareTo(NotificationTask task) {
return notification.getTimestamp().compareTo(task.getTimestamp());
}
@Override
public void run() {
notificationService.processNotification(notification);
}
}
这就是我执行它的方式:
asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));
现在,如果出现问题怎么办,或者您怎么知道出现问题了?例如,如果其中一项任务抛出异常?你怎么处理的?如果抛出异常,我想记录。
我发现一篇文章 (https://ewirch.github.io/2013/12/a-executor-is-not-a-thread.html) 建议覆盖 afterExecute()
- class 的方法:ThreadPoolExecutor
。但是,我目前正在使用 Spring 的 ThreadPoolTaskExecutor
,它没有 Java 的 ThreadPoolExecutor
的 beforeExecute()
和 afterExecute()
回调方法].
我可以扩展 ThreadPoolTaskExecutor
并覆盖 initializeExecutor()
方法并创建自定义 ThreadPoolExecutor
的实例。但问题是 initializeExecutor
方法使用了 ThreadPoolTaskExecutor
.
的私有字段
有人有更好的主意或更好的方法吗?
For an alternative, you may set up a ThreadPoolExecutor instance directly using constructor injection, or use a factory method definition that points to the Executors class. To expose such a raw Executor as a Spring TaskExecutor, simply wrap it with a ConcurrentTaskExecutor adapter.
但我没有看到任何与 ThreadPoolExecutor
相关的构造函数,我们可以在其中注入,所以这可能是一个神话,或者他们已经删除了该功能。
幸运的是我们有ThreadPoolExecutorFactoryBean
来拯救:
If you prefer native ExecutorService exposure instead, consider ThreadPoolExecutorFactoryBean as an alternative to this class.
这个执行器公开了一个我们可以自定义线程池的方法:
public class NotificationPool extends ThreadPoolExecutorFactoryBean {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
@Override
protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new YourCustomThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
}
并覆盖默认的 afterExecute
回调:
public class YourCustomThreadPoolExecutor extends ThreadPoolExecutor {
public YourCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// Here do something with your exception
}
}
你可以像这样轻松使用。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
@Override
public void execute(@NotNull Runnable task) {
try {
super.execute(task);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
};
我实现了 Spring-TaskExecutor(相当于 JDK 1.5 的执行器。)来处理从外部系统接收的通知通知。
接口只有一种方法:
public interface AsynchronousService {
void executeAsynchronously(Runnable task);
}
以及相应的实现:
public class AsynchronousServiceImpl implements AsynchronousService {
private TaskExecutor taskExecutor;
@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}
@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
Xml-任务执行器的配置(遗留应用程序):
<bean id="taskExecutor" class="org.example.impl.NotificationPool">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<property name="queueCapacity" value="100"/>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
为 corePoolSize 和 maxPoolSize 都设置了 1,因为我希望任务按顺序执行(处理任务的池只创建 1 个线程)。
我想根据收到通知的日期对我的任务进行排序,因此我需要覆盖此函数以允许优先排序:
public class NotificationPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}
这是通知任务class:
public class NotificationTask implements Runnable, Comparable<NotificationTask> {
private final NotificationService notificationService;
private final Notification notification;
public NotificationService(NotificationService notificationService,
Notification notification) {
this.notificationService = notificationService;
this.notification = notification;
}
@Override
public int compareTo(NotificationTask task) {
return notification.getTimestamp().compareTo(task.getTimestamp());
}
@Override
public void run() {
notificationService.processNotification(notification);
}
}
这就是我执行它的方式:
asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));
现在,如果出现问题怎么办,或者您怎么知道出现问题了?例如,如果其中一项任务抛出异常?你怎么处理的?如果抛出异常,我想记录。
我发现一篇文章 (https://ewirch.github.io/2013/12/a-executor-is-not-a-thread.html) 建议覆盖 afterExecute()
- class 的方法:ThreadPoolExecutor
。但是,我目前正在使用 Spring 的 ThreadPoolTaskExecutor
,它没有 Java 的 ThreadPoolExecutor
的 beforeExecute()
和 afterExecute()
回调方法].
我可以扩展 ThreadPoolTaskExecutor
并覆盖 initializeExecutor()
方法并创建自定义 ThreadPoolExecutor
的实例。但问题是 initializeExecutor
方法使用了 ThreadPoolTaskExecutor
.
有人有更好的主意或更好的方法吗?
For an alternative, you may set up a ThreadPoolExecutor instance directly using constructor injection, or use a factory method definition that points to the Executors class. To expose such a raw Executor as a Spring TaskExecutor, simply wrap it with a ConcurrentTaskExecutor adapter.
但我没有看到任何与 ThreadPoolExecutor
相关的构造函数,我们可以在其中注入,所以这可能是一个神话,或者他们已经删除了该功能。
幸运的是我们有ThreadPoolExecutorFactoryBean
来拯救:
If you prefer native ExecutorService exposure instead, consider ThreadPoolExecutorFactoryBean as an alternative to this class.
这个执行器公开了一个我们可以自定义线程池的方法:
public class NotificationPool extends ThreadPoolExecutorFactoryBean {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
@Override
protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new YourCustomThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
}
并覆盖默认的 afterExecute
回调:
public class YourCustomThreadPoolExecutor extends ThreadPoolExecutor {
public YourCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// Here do something with your exception
}
}
你可以像这样轻松使用。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
@Override
public void execute(@NotNull Runnable task) {
try {
super.execute(task);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
};