有没有办法防止 ClosedByInterruptException?
Is there a way to prevent ClosedByInterruptException?
在下面的例子中,我有一个文件被两个线程使用(在真实的例子中我可以有任意数量的线程)
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class A {
static volatile boolean running = true;
public static void main(String[] args) throws IOException, InterruptedException {
String name = "delete.me";
new File(name).deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(name, "rw");
FileChannel fc = raf.getChannel();
Thread monitor = new Thread(() -> {
try {
while (running) {
System.out.println(name + " is " + (fc.size() >> 10) + " KB");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
}
}
} catch (IOException e) {
System.err.println("Monitor thread died");
e.printStackTrace();
}
});
monitor.setDaemon(true);
monitor.start();
Thread writer = new Thread(() -> {
ByteBuffer bb = ByteBuffer.allocateDirect(32);
try {
while (running) {
bb.position(0).limit(32);
fc.write(bb);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
}
}
} catch (IOException e) {
System.err.println("Writer thread died");
e.printStackTrace();
}
});
writer.setDaemon(true);
writer.start();
Thread.sleep(5000);
monitor.interrupt();
Thread.sleep(2000);
running = false;
raf.close();
}
}
我没有为每个线程创建一个 RandomAccessFile 和一个内存映射,而是在线程之间共享一个文件和一个内存映射,但是有一个问题,如果任何线程被中断,资源就会关闭。
delete.me is 0 KB
delete.me is 2 KB
delete.me is 4 KB
delete.me is 6 KB
delete.me is 8 KB
Interrupted
Monitor thread died
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
at A.lambda$main[=11=](A.java:19)
at java.lang.Thread.run(Thread.java:748)
Writer thread died
java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
at A.lambda$main(A.java:41)
at java.lang.Thread.run(Thread.java:748)
有什么方法可以防止 FileChannel 仅仅因为使用它的一个线程被中断而被关闭吗?
编辑我想避免做的是因为我怀疑它不适用于 Java 9+
private void doNotCloseOnInterrupt(FileChannel fc) {
try {
Field field = AbstractInterruptibleChannel.class
.getDeclaredField("interruptor");
field.setAccessible(true);
field.set(fc, (Interruptible) thread
-> Jvm.warn().on(getClass(), fc + " not closed on interrupt"));
} catch (Exception e) {
Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
}
}
顺便说一句,对 fc.size()
returns 的调用大小与上述 hack 的预期一致。
你可以使用反射访问 interruptor
字段 非法 并从那里获取 sun.nio.ch.Interruptible
class 类型以创建代理实例:
private void doNotCloseOnInterrupt(FileChannel fc) {
try {
Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor");
Class<?> interruptibleClass = field.getType();
field.setAccessible(true);
field.set(fc, Proxy.newProxyInstance(
interruptibleClass.getClassLoader(),
new Class[] { interruptibleClass },
new InterruptibleInvocationHandler()));
} catch (final Exception e) {
Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
}
}
public class InterruptibleInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
// TODO: Check method and handle accordingly
return null;
}
}
在 Java9 中,这只适用于单个警告,因为它默认运行 --illegal-access=permit
。
然而,这个标志可能会在未来的版本中被删除,确保它长期有效的最好方法是使用标志 --add-opens
:
--add-opens java.base/sun.nio.ch=your-module
--add-opens java.base/java.nio.channels.spi=your-module
或者,如果您不使用模块(不推荐):
--add-opens java.base/sun.nio.ch=ALL-UNNAMED
--add-opens java.base/java.nio.channels.spi=ALL-UNNAMED
这适用于 Java 9、Java 10 和当前的 JDK 11 早期访问版本 (28 (2018/8/23))。
通过使用 AsynchronousFileChannel 然后永远不会抛出 ClosedByInterruptException
它似乎并不关心中断
测试使用 jdk 1.8.0_72
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;
public class A {
static volatile boolean running = true;
public static void main(String[] args) throws IOException, InterruptedException {
String name = "delete.me";
Path path = new File(name).toPath();
AtomicLong position = new AtomicLong(0);
AsynchronousFileChannel fc = AsynchronousFileChannel.open(path,
StandardOpenOption.CREATE_NEW, StandardOpenOption.DELETE_ON_CLOSE ,
StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.WRITE, StandardOpenOption.SYNC);
CompletionHandler<Integer, Object> handler =
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
//System.out.println(attachment + " completed with " + result + " bytes written");
position.getAndAdd(result);
}
@Override
public void failed(Throwable e, Object attachment) {
System.err.println(attachment + " failed with:");
e.printStackTrace();
}
};
Runnable monitorRun = () -> {
try {
while (running) {
System.out.println(name + " is " + (fc.size() >> 10) + " KB");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
System.out.println("Interrupt call failed so return");
return;
}
}
} catch (IOException e) {
System.err.println("Monitor thread died");
e.printStackTrace();
}
};
Thread monitor = new Thread(monitorRun);
monitor.setDaemon(true);
monitor.start();
Thread writer = new Thread(() -> {
ByteBuffer bb = ByteBuffer.allocateDirect(32);
try {
while (running) {
bb.position(0).limit(32);
fc.write(bb,position.get(),null,handler);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
System.err.println("Writer thread died");
e.printStackTrace();
}
});
writer.setDaemon(true);
writer.start();
Thread.sleep(5000);
monitor.interrupt();
Thread.sleep(2000);
monitor = new Thread(monitorRun);
monitor.start();
Thread.sleep(5000);
running = false;
fc.close();
}
}
生成以下输出:
delete.me is 0 KB
delete.me is 3 KB
delete.me is 6 KB
delete.me is 9 KB
delete.me is 12 KB
Interrupted
Interrupt call failed so return
delete.me is 21 KB
delete.me is 24 KB
delete.me is 27 KB
delete.me is 30 KB
delete.me is 33 KB
既然你说你想要“线程之间共享一个内存映射”,那么根本不存在这个问题,因为内存映射不受FileChannel
关闭的影响。事实上,尽快关闭通道是一个很好的策略,以减少应用程序占用的资源。
例如
static volatile boolean running = true;
public static void main(String[] args) throws IOException {
Path name = Paths.get("delete.me");
MappedByteBuffer mapped;
try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) {
mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
}
Thread thread1 = new Thread(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
byte[] b = new byte[5];
mapped.position(4000);
mapped.get(b);
System.out.println("read "+new String(b, StandardCharsets.US_ASCII));
}
});
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(() -> {
byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
mapped.position(4000);
mapped.put(b);
System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII));
byte b1 = b[0];
System.arraycopy(b, 1, b, 0, b.length-1);
b[b.length-1] = b1;
}
mapped.force();
});
thread2.setDaemon(true);
thread2.start();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
thread2.interrupt();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
running = false;
这演示了线程如何在通道关闭后读取和写入数据,并且中断写入线程不会停止读取线程。
如果除了内存映射I/O之外还需要执行FileChannel
操作,使用多个FileChannel
实例是没有问题的,所以关闭一个通道不会影响另一个.例如
static volatile boolean running = true;
public static void main(String[] args) throws IOException {
Path name = Paths.get("delete.me");
try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE);
FileChannel fc2 = FileChannel.open(name,READ,WRITE)) {
Thread thread1 = new Thread(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
try {
MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
byte[] b = new byte[5];
mapped.position(4000);
mapped.get(b);
System.out.println("read from map "
+new String(b, StandardCharsets.US_ASCII)
+", file size "+fc1.size());
}
}catch(IOException ex) {
ex.printStackTrace();
}
});
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(() -> {
byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
try {
MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fc2.position(4096);
try {
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
mapped.position(4000);
mapped.put(b);
System.out.println("wrote to mapped "
+new String(b, StandardCharsets.US_ASCII));
byte b1 = b[0];
System.arraycopy(b, 1, b, 0, b.length-1);
b[b.length-1] = b1;
fc2.write(ByteBuffer.wrap(b));
}
} finally { mapped.force(); }
}catch(IOException ex) {
ex.printStackTrace();
}
});
thread2.setDaemon(true);
thread2.start();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
thread2.interrupt();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
running = false;
}
}
这里,一个线程的中断确实关闭了它的通道,但不影响另一个。此外,即使每个线程都从自己的通道获取自己的 MappedByteBuffer
,即使不使用 force()
,更改也会显示给另一个线程。当然,后者被定义为依赖于系统的行为,不能保证在每个系统上都有效。
但是如第一个示例所示,您仍然可以在开始时仅从一个通道创建共享缓冲区,同时在不同的通道上执行 I/O 操作,每个线程一个,并且它不会无论是否关闭以及关闭哪些通道,映射的缓冲区都不受其影响。
在下面的例子中,我有一个文件被两个线程使用(在真实的例子中我可以有任意数量的线程)
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class A {
static volatile boolean running = true;
public static void main(String[] args) throws IOException, InterruptedException {
String name = "delete.me";
new File(name).deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(name, "rw");
FileChannel fc = raf.getChannel();
Thread monitor = new Thread(() -> {
try {
while (running) {
System.out.println(name + " is " + (fc.size() >> 10) + " KB");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
}
}
} catch (IOException e) {
System.err.println("Monitor thread died");
e.printStackTrace();
}
});
monitor.setDaemon(true);
monitor.start();
Thread writer = new Thread(() -> {
ByteBuffer bb = ByteBuffer.allocateDirect(32);
try {
while (running) {
bb.position(0).limit(32);
fc.write(bb);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
}
}
} catch (IOException e) {
System.err.println("Writer thread died");
e.printStackTrace();
}
});
writer.setDaemon(true);
writer.start();
Thread.sleep(5000);
monitor.interrupt();
Thread.sleep(2000);
running = false;
raf.close();
}
}
我没有为每个线程创建一个 RandomAccessFile 和一个内存映射,而是在线程之间共享一个文件和一个内存映射,但是有一个问题,如果任何线程被中断,资源就会关闭。
delete.me is 0 KB
delete.me is 2 KB
delete.me is 4 KB
delete.me is 6 KB
delete.me is 8 KB
Interrupted
Monitor thread died
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
at A.lambda$main[=11=](A.java:19)
at java.lang.Thread.run(Thread.java:748)
Writer thread died
java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
at A.lambda$main(A.java:41)
at java.lang.Thread.run(Thread.java:748)
有什么方法可以防止 FileChannel 仅仅因为使用它的一个线程被中断而被关闭吗?
编辑我想避免做的是因为我怀疑它不适用于 Java 9+
private void doNotCloseOnInterrupt(FileChannel fc) {
try {
Field field = AbstractInterruptibleChannel.class
.getDeclaredField("interruptor");
field.setAccessible(true);
field.set(fc, (Interruptible) thread
-> Jvm.warn().on(getClass(), fc + " not closed on interrupt"));
} catch (Exception e) {
Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
}
}
顺便说一句,对 fc.size()
returns 的调用大小与上述 hack 的预期一致。
你可以使用反射访问 interruptor
字段 非法 并从那里获取 sun.nio.ch.Interruptible
class 类型以创建代理实例:
private void doNotCloseOnInterrupt(FileChannel fc) {
try {
Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor");
Class<?> interruptibleClass = field.getType();
field.setAccessible(true);
field.set(fc, Proxy.newProxyInstance(
interruptibleClass.getClassLoader(),
new Class[] { interruptibleClass },
new InterruptibleInvocationHandler()));
} catch (final Exception e) {
Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
}
}
public class InterruptibleInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
// TODO: Check method and handle accordingly
return null;
}
}
在 Java9 中,这只适用于单个警告,因为它默认运行 --illegal-access=permit
。
然而,这个标志可能会在未来的版本中被删除,确保它长期有效的最好方法是使用标志 --add-opens
:
--add-opens java.base/sun.nio.ch=your-module
--add-opens java.base/java.nio.channels.spi=your-module
或者,如果您不使用模块(不推荐):
--add-opens java.base/sun.nio.ch=ALL-UNNAMED
--add-opens java.base/java.nio.channels.spi=ALL-UNNAMED
这适用于 Java 9、Java 10 和当前的 JDK 11 早期访问版本 (28 (2018/8/23))。
通过使用 AsynchronousFileChannel 然后永远不会抛出 ClosedByInterruptException 它似乎并不关心中断
测试使用 jdk 1.8.0_72
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;
public class A {
static volatile boolean running = true;
public static void main(String[] args) throws IOException, InterruptedException {
String name = "delete.me";
Path path = new File(name).toPath();
AtomicLong position = new AtomicLong(0);
AsynchronousFileChannel fc = AsynchronousFileChannel.open(path,
StandardOpenOption.CREATE_NEW, StandardOpenOption.DELETE_ON_CLOSE ,
StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.WRITE, StandardOpenOption.SYNC);
CompletionHandler<Integer, Object> handler =
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
//System.out.println(attachment + " completed with " + result + " bytes written");
position.getAndAdd(result);
}
@Override
public void failed(Throwable e, Object attachment) {
System.err.println(attachment + " failed with:");
e.printStackTrace();
}
};
Runnable monitorRun = () -> {
try {
while (running) {
System.out.println(name + " is " + (fc.size() >> 10) + " KB");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
System.out.println("Interrupt call failed so return");
return;
}
}
} catch (IOException e) {
System.err.println("Monitor thread died");
e.printStackTrace();
}
};
Thread monitor = new Thread(monitorRun);
monitor.setDaemon(true);
monitor.start();
Thread writer = new Thread(() -> {
ByteBuffer bb = ByteBuffer.allocateDirect(32);
try {
while (running) {
bb.position(0).limit(32);
fc.write(bb,position.get(),null,handler);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("Interrupted");
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
System.err.println("Writer thread died");
e.printStackTrace();
}
});
writer.setDaemon(true);
writer.start();
Thread.sleep(5000);
monitor.interrupt();
Thread.sleep(2000);
monitor = new Thread(monitorRun);
monitor.start();
Thread.sleep(5000);
running = false;
fc.close();
}
}
生成以下输出:
delete.me is 0 KB
delete.me is 3 KB
delete.me is 6 KB
delete.me is 9 KB
delete.me is 12 KB
Interrupted
Interrupt call failed so return
delete.me is 21 KB
delete.me is 24 KB
delete.me is 27 KB
delete.me is 30 KB
delete.me is 33 KB
既然你说你想要“线程之间共享一个内存映射”,那么根本不存在这个问题,因为内存映射不受FileChannel
关闭的影响。事实上,尽快关闭通道是一个很好的策略,以减少应用程序占用的资源。
例如
static volatile boolean running = true;
public static void main(String[] args) throws IOException {
Path name = Paths.get("delete.me");
MappedByteBuffer mapped;
try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) {
mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
}
Thread thread1 = new Thread(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
byte[] b = new byte[5];
mapped.position(4000);
mapped.get(b);
System.out.println("read "+new String(b, StandardCharsets.US_ASCII));
}
});
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(() -> {
byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
mapped.position(4000);
mapped.put(b);
System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII));
byte b1 = b[0];
System.arraycopy(b, 1, b, 0, b.length-1);
b[b.length-1] = b1;
}
mapped.force();
});
thread2.setDaemon(true);
thread2.start();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
thread2.interrupt();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
running = false;
这演示了线程如何在通道关闭后读取和写入数据,并且中断写入线程不会停止读取线程。
如果除了内存映射I/O之外还需要执行FileChannel
操作,使用多个FileChannel
实例是没有问题的,所以关闭一个通道不会影响另一个.例如
static volatile boolean running = true;
public static void main(String[] args) throws IOException {
Path name = Paths.get("delete.me");
try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE);
FileChannel fc2 = FileChannel.open(name,READ,WRITE)) {
Thread thread1 = new Thread(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
try {
MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
byte[] b = new byte[5];
mapped.position(4000);
mapped.get(b);
System.out.println("read from map "
+new String(b, StandardCharsets.US_ASCII)
+", file size "+fc1.size());
}
}catch(IOException ex) {
ex.printStackTrace();
}
});
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(() -> {
byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
try {
MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
fc2.position(4096);
try {
while(running && !Thread.interrupted()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
mapped.position(4000);
mapped.put(b);
System.out.println("wrote to mapped "
+new String(b, StandardCharsets.US_ASCII));
byte b1 = b[0];
System.arraycopy(b, 1, b, 0, b.length-1);
b[b.length-1] = b1;
fc2.write(ByteBuffer.wrap(b));
}
} finally { mapped.force(); }
}catch(IOException ex) {
ex.printStackTrace();
}
});
thread2.setDaemon(true);
thread2.start();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
thread2.interrupt();
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
running = false;
}
}
这里,一个线程的中断确实关闭了它的通道,但不影响另一个。此外,即使每个线程都从自己的通道获取自己的 MappedByteBuffer
,即使不使用 force()
,更改也会显示给另一个线程。当然,后者被定义为依赖于系统的行为,不能保证在每个系统上都有效。
但是如第一个示例所示,您仍然可以在开始时仅从一个通道创建共享缓冲区,同时在不同的通道上执行 I/O 操作,每个线程一个,并且它不会无论是否关闭以及关闭哪些通道,映射的缓冲区都不受其影响。