在等待未来时处理异步执行程序关闭

treating of asynchronous executor shutdown when waiting on future

我有一个 ssh 客户端库实现。每个连接都有几个执行者。一种是使用 ScheduledThreadPoolExecutor 的线程池,用于对短期任务和计时器进行排队。一个是read executor,用来持有一个packet receiver task。一个是write executor,串行执行任务,每个任务发送一个数据包到服务器。当然read和write executor都是单线程的,write executor是作为消息队列之类的东西使用的。

我遇到的问题是:对消息进行排队的方法,以及对任务进行排队的一些方法,return CompletableFuture。我用 CompletableFuture.runAsync 方法排队。但是,连接可能会以有序或强制的方式异步关闭。在这种情况下,使用 shutdownNow 方法关闭部分或所有池。

如果某些线程(包括那些池外的线程)可能会等待某些任务同步完成,并且由于包括网络错误在内的所有问题而存在异步 shutdownNow 的风险,该怎么办? shutdownNow 不发布未来的取消方法。我不关心实际任务是否被中断,我只关心如果执行者在他们的任务仍在队列中时关闭,期货将无限期阻塞。

处理这种情况的最佳做法是什么?人们 do/etc 做什么?

好的,我相信我有想法了。如下:

因为并行关闭会等待所有任务完成,而 shutdownNow 只会将它们丢弃而不取消,而且因为我实际上一直在使用可完成的期货,所以我决定维护一组各种可完成的期货连接,它将保存所有任务,包括消息发送者和提交到任务池的正常任务。关闭连接或开始有序断开连接的每种方法都将经过该集合并异常完成所有期货,但有一些例外。这比取消给出了更好的错误。如果任务以这种方式自行取消,也不会发生任何事情。

我没有使用 runAsync,或者在任务与可运行对象无关的情况下通常创建可完成的未来,而是使用一种特殊的方法来创建这样的任务,将其添加到集合中,并使用 [=18 附加一个函数=](),如果任务因任何原因完成,则将其从集合中删除。我还有 runAsync,它使用前面描述的方法创建任务,然后使用 CompletableFuture.completeAsync.

提交一个可运行的

这样一来,所有等待的线程都应该在连接关闭时解除阻塞,并从包括已发送消息在内的所有任务中获得一个很好的异常,无论我使用哪种方法等待完成,get() 或 join()。