消息驱动 bean 中的长 运行 任务和超时

Long running tasks in message-driven beans and timeouts

我想使用 MDB 处理一些非常长的 运行 任务(数小时),但是 MDB 由 JMS 支持,它有一些超时(我认为在 tomEE 中大约 10 分钟)。

更糟糕的是,工作也很持久。

由于我无法捕获 MDB 上的超时异常(可以吗?),我正在考虑手动处理 JMS 使用者的选项。手动处理可以通过两种方式完成:同步或异步。

异步地,我将回退到实现 MessageListener 的无状态 bean,然后我将无法再次处理超时。

同步地,我想,我可以处理 catch 块中的超时,但是,我如何在 TomEE+ 中实现 instances/workers/whatever 池,它们正在监听队列,等待作业过程? (记住,这里的timeout不是等待消息出现在队列中的时间,而是执行长运行任务的时间)

尝试将消费 JMS 消息和长 运行 处理任务分开

MDB 可以从队列中获取消息并调用无状态会话 bean Asynchronous Method Invocation

为什么不将冗长的 运行 进程分叉到一个单独的对象中? > 如果有办法控制这些单独对象的池大小,它可能是一种方法。你想尝试一个答案吗? :-)

您不希望不受控制的对象随时间增长?嗯...我知道你要去哪里了。

我唯一能想到的就是使用并发框架中的 FixedThreadPool。生成一个线程来执行长 运行 任务,并确保 MDB returns 在移交工作后立即。通过在测试环境中分析您的应用程序来控制线程池。

Weblogic 和 Websphere 等应用程序服务器为您提供了 WorkManager 等复杂框架来完成此类任务。

ManagedExecutorService exists in JEE7 to handle the execution of asynchronous tasks. By leveraging a Future interface implementation (see FutureTask) 任务也可以取消。以下是一个例子,不要复制它 1:1 并在生产中使用它。它在未来的地图中有内存泄漏,UUID 永远不会被清理。

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;

import javax.annotation.Resource;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AsyncTaskManager {

    @Resource
    private ManagedExecutorService mes;

    private Map<String, Future> futureByUUID = new ConcurrentHashMap<>();

    /**
     * Launch the provided Future implemented task.
     * @param runnable The runnable implementation to execute.
     * @param <R> The type of the task result returned.
     * @return The UUID that the executing ask is mapped to.
     */
    public <R> String launch (RunnableFuture<R> runnable) {
        String uuid = UUID.randomUUID().toString();
        mes.submit(runnable);
        futureByUUID.put(uuid, runnable);
        return uuid;
    }

    /**
     * Retrieve a future instance by it's UUID.
     * @param uuid The uuid of the future.
     * @param <T> The type of the future's result.
     * @return The future instance.
     */
    @SuppressWarnings("unchecked")
    public <T> Future<T> getByUUID (String uuid) {
        return (Future<T>)futureByUUID.get(uuid);
    }
}

有一整套接口和 类 可以结合使用以更改执行程序服务中任务的行为。例如,ManagedTask 接口提供了对其中一些行为的访问。

ManagedExecutorService 用于 @Asynchronous EJB 方法注释,如 MGolovanov 的回答中所述。任务创建和 Future 实例创建由 EJB 框架处理。