关闭正在处理 Stream 的 ExecutorService
Shutting down ExecutorService that is processing a Stream
我正在开发一个处理大量项目并将每个结果发送到 REST 服务器的程序。该服务器的速率限制为每小时 2000 个请求,因此程序必须在处理两个项目之间暂停一段时间。处理失败的项目应该在之后呈现给用户。
一切正常,直到我发现程序关闭时 shutdownNow()
不工作。 UI 关闭但执行程序继续工作。下面简单总结一下代码。
ExecutorService exec = Executors.newSingleThreadExecutor();
SomeProcessor p = new SomeProcessor();
void process() {
exec.submit(() -> {
Stream<SomeObject> s = ...
List<SomeObject> failed = p.process(s);
// show failed in UI
};
}
void exit() {
exec.shutdownNow();
}
还有 SomeProcessor
class:
List<SomeObject> process(Stream<SomeObject> s) {
List<SomeObject> failed = s
.sequential()
.filter(o -> !ignore(o)) // ignore irrelevant items
.peek(o -> pause()) // make sure not to exceed server's rate limit
.filter(o -> !process(o)) // keep items failed to process
.collect(Collectors.asList());
return failed;
}
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
boolean process(SomeObject o) {
if (Thread.interrupted()) // make task interruptible
// *** but then what? ***
try {
// process o and send result to server
return true;
} catch (SomeException e) {
return false;
}
}
我猜 shutdownNow()
没有工作,因为任务不可中断。所以我试图让任务可中断,但我不知道它应该是什么样子。有什么想法吗?
还有一个额外的问题。 pause()
方法做了它应该做的事情。我仍然宁愿使用 ScheduledThreadPoolExecutor.scheduleAtFixedRate(.)
之类的东西,但随后会处理一系列任务。有这样的东西吗?
感谢您的帮助!
看看你的pause
方法:
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
此时您已经检测到中断,但通过再次设置线程的中断状态对其做出反应,或者至少尝试这样做,因为 Thread.interrupted()
应该 Thread.currentThread().interrupt()
来实现这个,如果你不想支持中断就好了,但是在这里它会适得其反。它将导致您的下一个 sleep
调用立即抛出一个 InterruptedException
,您以相同的方式处理它,依此类推。因此,您不仅在继续处理剩余的元素,而且在元素之间没有休眠。
当您将方法更改为
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
throw new IllegalStateException("interrupted");
}
}
中断将终止您使用 IllegalStateException
的流操作。为清楚起见,您可以为此场景定义自己的异常类型(扩展 RuntimeException
),以区别于所有其他异常类型。
我正在开发一个处理大量项目并将每个结果发送到 REST 服务器的程序。该服务器的速率限制为每小时 2000 个请求,因此程序必须在处理两个项目之间暂停一段时间。处理失败的项目应该在之后呈现给用户。
一切正常,直到我发现程序关闭时 shutdownNow()
不工作。 UI 关闭但执行程序继续工作。下面简单总结一下代码。
ExecutorService exec = Executors.newSingleThreadExecutor();
SomeProcessor p = new SomeProcessor();
void process() {
exec.submit(() -> {
Stream<SomeObject> s = ...
List<SomeObject> failed = p.process(s);
// show failed in UI
};
}
void exit() {
exec.shutdownNow();
}
还有 SomeProcessor
class:
List<SomeObject> process(Stream<SomeObject> s) {
List<SomeObject> failed = s
.sequential()
.filter(o -> !ignore(o)) // ignore irrelevant items
.peek(o -> pause()) // make sure not to exceed server's rate limit
.filter(o -> !process(o)) // keep items failed to process
.collect(Collectors.asList());
return failed;
}
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
boolean process(SomeObject o) {
if (Thread.interrupted()) // make task interruptible
// *** but then what? ***
try {
// process o and send result to server
return true;
} catch (SomeException e) {
return false;
}
}
我猜 shutdownNow()
没有工作,因为任务不可中断。所以我试图让任务可中断,但我不知道它应该是什么样子。有什么想法吗?
还有一个额外的问题。 pause()
方法做了它应该做的事情。我仍然宁愿使用 ScheduledThreadPoolExecutor.scheduleAtFixedRate(.)
之类的东西,但随后会处理一系列任务。有这样的东西吗?
感谢您的帮助!
看看你的pause
方法:
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
此时您已经检测到中断,但通过再次设置线程的中断状态对其做出反应,或者至少尝试这样做,因为 Thread.interrupted()
应该 Thread.currentThread().interrupt()
来实现这个,如果你不想支持中断就好了,但是在这里它会适得其反。它将导致您的下一个 sleep
调用立即抛出一个 InterruptedException
,您以相同的方式处理它,依此类推。因此,您不仅在继续处理剩余的元素,而且在元素之间没有休眠。
当您将方法更改为
void pause() {
try {
TimeUnit.MILLISECONDS.sleep(...);
} catch (final InterruptedException e) {
throw new IllegalStateException("interrupted");
}
}
中断将终止您使用 IllegalStateException
的流操作。为清楚起见,您可以为此场景定义自己的异常类型(扩展 RuntimeException
),以区别于所有其他异常类型。