执行程序服务停止线程等待 IO
Executor service stop thread waiting for IO
我正在使用 Apache 公共库从 Web 下载页面
这是程序的简化版本
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> new Thread(r,"second"));
final ExecutorService service = Executors.newFixedThreadPool(2, r -> new Thread(r,"first"));
final Future<Object> submit = executorService.submit(() -> {
final Future<Object> create = service.submit(() -> {
System.out.println("Start");
org.apache.commons.io.IOUtils.copy(new URL("ny_url").openStream(), new FileWriter(new File("create")));
System.out.println("End");
return null;
});
try {
return create.get();
} catch (InterruptedException exc) {
System.out.println("Interrupted");
create.cancel(true);
return null;
}
});
try {
submit.get(3000, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
System.out.println("Timeout");
submit.cancel(true);
System.out.println("HERE2");
}
executorService.shutdown();
service.shutdownNow();
}
如你所见,我有两个执行程序,第一个下载网页,第二个等待完成三秒,如果作业无法在 3 秒内下载页面,那么主线程发送中断,我的第二个执行程序捕获此异常并调用 future.cancel(true);
最后我关闭了两个执行程序,但是,第一个执行程序仍在工作这是它的线程转储
first" #11 prio=5 os_prio=0 tid=0x00007f18b0002000 nid=0x527d runnable [0x00007f18de08a000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
- locked <0x00000000d9004a60> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000d9004b20> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000d9006b60> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x00000000d9006b88> (a sun.net.www.http.KeepAliveStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3444)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000d902a040> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.Reader.read(Reader.java:140)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2001)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1980)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1957)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1907)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1886)
at com.devadmin.nma.downloader.TestExecutor.lambda$null(TestExecutor.java:17)
所以我假设第一个执行者的工作对中断一无所知,因为它正在等待 IO,问题是,Apache commons 在复制流时不检查 Thread.isInterrupted
,是否可以停止此线程无需更改库,谢谢
我读了这个博客https://techblog.bozho.net/interrupting-executor-tasks/
这给了我一个想法如何去做
这是我的程序现在的样子
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, "second"));
final ExecutorService service = Executors.newFixedThreadPool(2, r -> new Thread(r, "first"));
final InputStream inputStream = new URL("https://ep.t8cdn.com/201804/17/47295201/mp4_ultra_47295201.mp4?validfrom=1586656571&validto=1586663771&rate=94k&burst=1600k&ip=66.183.216.58&hash=FXdFmkw14K1PuL9lc8GzOjzcaeI%3D").openStream();
final Future<Object> submit = executorService.submit(() -> {
final Future<Object> create = service.submit(() -> {
System.out.println("Start");
IOUtils.copy(inputStream, new FileWriter(new File("create")));
System.out.println("End");
return null;
});
try {
return create.get();
} catch (InterruptedException exc) {
System.out.println("Interrupted");
create.cancel(true);
inputStream.close();
return null;
}
});
try {
submit.get(3000, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
System.out.println("Timeout");
submit.cancel(true);
System.out.println("HERE2");
}
executorService.shutdownNow();
service.shutdownNow();
我所做的是在将流提交给第一个执行程序之前创建流,然后在第二个执行程序中关闭
我正在使用 Apache 公共库从 Web 下载页面 这是程序的简化版本
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> new Thread(r,"second"));
final ExecutorService service = Executors.newFixedThreadPool(2, r -> new Thread(r,"first"));
final Future<Object> submit = executorService.submit(() -> {
final Future<Object> create = service.submit(() -> {
System.out.println("Start");
org.apache.commons.io.IOUtils.copy(new URL("ny_url").openStream(), new FileWriter(new File("create")));
System.out.println("End");
return null;
});
try {
return create.get();
} catch (InterruptedException exc) {
System.out.println("Interrupted");
create.cancel(true);
return null;
}
});
try {
submit.get(3000, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
System.out.println("Timeout");
submit.cancel(true);
System.out.println("HERE2");
}
executorService.shutdown();
service.shutdownNow();
}
如你所见,我有两个执行程序,第一个下载网页,第二个等待完成三秒,如果作业无法在 3 秒内下载页面,那么主线程发送中断,我的第二个执行程序捕获此异常并调用 future.cancel(true);
最后我关闭了两个执行程序,但是,第一个执行程序仍在工作这是它的线程转储
first" #11 prio=5 os_prio=0 tid=0x00007f18b0002000 nid=0x527d runnable [0x00007f18de08a000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
- locked <0x00000000d9004a60> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000d9004b20> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000d9006b60> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x00000000d9006b88> (a sun.net.www.http.KeepAliveStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3444)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000d902a040> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.Reader.read(Reader.java:140)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2001)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1980)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1957)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1907)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1886)
at com.devadmin.nma.downloader.TestExecutor.lambda$null(TestExecutor.java:17)
所以我假设第一个执行者的工作对中断一无所知,因为它正在等待 IO,问题是,Apache commons 在复制流时不检查 Thread.isInterrupted
,是否可以停止此线程无需更改库,谢谢
我读了这个博客https://techblog.bozho.net/interrupting-executor-tasks/ 这给了我一个想法如何去做 这是我的程序现在的样子
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, "second"));
final ExecutorService service = Executors.newFixedThreadPool(2, r -> new Thread(r, "first"));
final InputStream inputStream = new URL("https://ep.t8cdn.com/201804/17/47295201/mp4_ultra_47295201.mp4?validfrom=1586656571&validto=1586663771&rate=94k&burst=1600k&ip=66.183.216.58&hash=FXdFmkw14K1PuL9lc8GzOjzcaeI%3D").openStream();
final Future<Object> submit = executorService.submit(() -> {
final Future<Object> create = service.submit(() -> {
System.out.println("Start");
IOUtils.copy(inputStream, new FileWriter(new File("create")));
System.out.println("End");
return null;
});
try {
return create.get();
} catch (InterruptedException exc) {
System.out.println("Interrupted");
create.cancel(true);
inputStream.close();
return null;
}
});
try {
submit.get(3000, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
System.out.println("Timeout");
submit.cancel(true);
System.out.println("HERE2");
}
executorService.shutdownNow();
service.shutdownNow();
我所做的是在将流提交给第一个执行程序之前创建流,然后在第二个执行程序中关闭