Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏
Java AsyncHttpClient: broken file while writing from LazyResponseBodyPart to AsynchronousFileChannel
我将 AsyncHttpClient library 用于异步非阻塞请求。
我的案例:在通过网络接收数据时将数据写入文件。
为了从远程主机下载文件并保存到文件,我使用默认的 ResponseBodyPartFactory.EAGER
和 AsynchronousFileChannel
以免在数据到达时阻塞 netty 线程。但正如我的测量结果所示,与 LAZY
相比,Java 堆中的内存消耗增加了许多倍。
所以我决定直接LAZY
,但没有考虑文件的后果
此代码将有助于重现问题。:
public static class AsyncChannelWriter {
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
}
public CompletableFuture<Integer> getStartPosition() {
return startPosition;
}
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {
return currentPosition.thenCompose(position -> {
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
});
}
public void close(CompletableFuture<Integer> currentPosition) {
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
}
}
public static void main(String[] args) throws IOException {
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
}
});
}
在这种情况下,图片已损坏。但是,如果我使用 FileChannel
而不是 AsynchronousFileChannel
,在这两种情况下,文件都会正常显示。使用 DirectByteBuffer
(如果使用 LazyResponseBodyPart.getBodyByteBuffer()
)和 AsynchronousFileChannel
时会有任何细微差别吗?
如果 EAGER
一切正常,我的代码可能有什么问题?
更新
正如我所注意到的,如果我使用 LAZY
,例如,我添加了这一行
Thread.sleep (10)
在方法onBodyPartReceived
中,像这样:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
}
文件以未损坏的状态保存到磁盘。
据我了解,原因是在这10毫秒内,AsynchronousFileChannel
中的异步线程设法从这个DirectByteBuffer
.
中写入数据到磁盘。
事实证明文件已损坏,因为此异步线程使用此缓冲区与 netty 线程一起写入。
如果我们用EagerResponseBodyPart
查看源代码,那么我们会看到以下内容
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
bytes = byteBuf2Bytes(buf);
}
@Override
public ByteBuffer getBodyByteBuffer() {
return ByteBuffer.wrap(bytes);
}
这样,当一个数据到达时,它会立即存储在字节数组中。然后我们就可以安全的将它们包裹在HeapByteBuffer中,并传递给文件通道中的异步线程。
但是如果你看一下代码LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
this.buf = buf;
}
@Override
public ByteBuffer getBodyByteBuffer() {
return buf.nioBuffer();
}
如你所见,我们实际上通过方法调用 nioBuffer
在异步文件通道线程中使用 netty ByteBuff
(在本例中总是 PooledSlicedByteBuf
)
在这种情况下我能做什么,如何在异步线程中安全地传递 DirectByteBuffer
而无需将缓冲区复制到 java 堆?
我和 AsyncHttpClient
的维护者谈过。
Can see here
主要问题是我没有使用 netty ByteBuf 方法 retain
和 release
。
最后,我得出了两个解决方案。
首先:将字节顺序写入跟踪位置为CompletableFuture
的文件。
为 AsynchronousFileChannel
定义包装器 class
@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable {
private final AtomicReference<CompletableFuture<Long>> positionReference;
private final AsynchronousFileChannel channel;
public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel) {
this.channel = channel;
try {
this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public CompletableFuture<Long> write(ByteBuf byteBuffer) {
final ByteBuf byteBuf = byteBuffer.retain();
return positionReference.updateAndGet(x -> x.thenCompose(position -> {
final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
@Override
public void completed(Integer result, ByteBuf attachment) {
attachment.release();
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuf attachment) {
attachment.release();
log.error(exc);
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
}));
}
public void close() {
positionReference.updateAndGet(x -> x.whenComplete((position, throwable) -> {
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
}
}
事实上,这里可能不会有AtomicReference
,如果记录发生在一个线程中,如果是多个线程,那么我们需要认真对待同步。
以及主要用途。
public static void main(String[] args) throws IOException {
final String filepath = "1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) {
final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
asyncChannelNettyByteBufWriter.write(byteBuf);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelNettyByteBufWriter.close();
return response;
}
});
}
方案二:根据接收到的字节大小跟踪位置
public static void main(String[] args) throws IOException {
final String filepath = "1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
private long position = 0;
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) {
final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
long currentPosition = position;
position+=byteBuf.readableBytes();
channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
@Override
public void completed(Integer result, ByteBuf attachment) {
attachment.release();
if(content.isLast()){
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
@Override
public void failed(Throwable exc, ByteBuf attachment) {
attachment.release();
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
return response;
}
});
}
第二种解决方案,因为我们没有等到一些字节写入文件,所以AsynchronousFileChannel
可以创建很多线程(如果使用Linux,因为Linux没有实现non-blocking异步文件IO。在Windows中情况好多了)。
正如我的测量显示,在写入慢速 USB 闪存的情况下,线程数可以达到数万个,因此为此您需要通过创建 ExecutorService
来限制线程数并将其转移到 AsynchronousFileChannel
.
第一种方案和第二种方案有明显的优缺点吗?我很难说。也许有人能告诉我什么更有效。
我将 AsyncHttpClient library 用于异步非阻塞请求。 我的案例:在通过网络接收数据时将数据写入文件。
为了从远程主机下载文件并保存到文件,我使用默认的 ResponseBodyPartFactory.EAGER
和 AsynchronousFileChannel
以免在数据到达时阻塞 netty 线程。但正如我的测量结果所示,与 LAZY
相比,Java 堆中的内存消耗增加了许多倍。
所以我决定直接LAZY
,但没有考虑文件的后果
此代码将有助于重现问题。:
public static class AsyncChannelWriter {
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
}
public CompletableFuture<Integer> getStartPosition() {
return startPosition;
}
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {
return currentPosition.thenCompose(position -> {
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
});
}
public void close(CompletableFuture<Integer> currentPosition) {
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
}
}
public static void main(String[] args) throws IOException {
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
}
});
}
在这种情况下,图片已损坏。但是,如果我使用 FileChannel
而不是 AsynchronousFileChannel
,在这两种情况下,文件都会正常显示。使用 DirectByteBuffer
(如果使用 LazyResponseBodyPart.getBodyByteBuffer()
)和 AsynchronousFileChannel
时会有任何细微差别吗?
如果 EAGER
一切正常,我的代码可能有什么问题?
更新
正如我所注意到的,如果我使用 LAZY
,例如,我添加了这一行
Thread.sleep (10)
在方法onBodyPartReceived
中,像这样:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
}
文件以未损坏的状态保存到磁盘。
据我了解,原因是在这10毫秒内,AsynchronousFileChannel
中的异步线程设法从这个DirectByteBuffer
.
事实证明文件已损坏,因为此异步线程使用此缓冲区与 netty 线程一起写入。
如果我们用EagerResponseBodyPart
查看源代码,那么我们会看到以下内容
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
bytes = byteBuf2Bytes(buf);
}
@Override
public ByteBuffer getBodyByteBuffer() {
return ByteBuffer.wrap(bytes);
}
这样,当一个数据到达时,它会立即存储在字节数组中。然后我们就可以安全的将它们包裹在HeapByteBuffer中,并传递给文件通道中的异步线程。
但是如果你看一下代码LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
this.buf = buf;
}
@Override
public ByteBuffer getBodyByteBuffer() {
return buf.nioBuffer();
}
如你所见,我们实际上通过方法调用 nioBuffer
ByteBuff
(在本例中总是 PooledSlicedByteBuf
)
在这种情况下我能做什么,如何在异步线程中安全地传递 DirectByteBuffer
而无需将缓冲区复制到 java 堆?
我和 AsyncHttpClient
的维护者谈过。
Can see here
主要问题是我没有使用 netty ByteBuf 方法 retain
和 release
。
最后,我得出了两个解决方案。
首先:将字节顺序写入跟踪位置为CompletableFuture
的文件。
为 AsynchronousFileChannel
@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable {
private final AtomicReference<CompletableFuture<Long>> positionReference;
private final AsynchronousFileChannel channel;
public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel) {
this.channel = channel;
try {
this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public CompletableFuture<Long> write(ByteBuf byteBuffer) {
final ByteBuf byteBuf = byteBuffer.retain();
return positionReference.updateAndGet(x -> x.thenCompose(position -> {
final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
@Override
public void completed(Integer result, ByteBuf attachment) {
attachment.release();
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuf attachment) {
attachment.release();
log.error(exc);
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
}));
}
public void close() {
positionReference.updateAndGet(x -> x.whenComplete((position, throwable) -> {
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
}
}
事实上,这里可能不会有AtomicReference
,如果记录发生在一个线程中,如果是多个线程,那么我们需要认真对待同步。
以及主要用途。
public static void main(String[] args) throws IOException {
final String filepath = "1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) {
final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
asyncChannelNettyByteBufWriter.write(byteBuf);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelNettyByteBufWriter.close();
return response;
}
});
}
方案二:根据接收到的字节大小跟踪位置
public static void main(String[] args) throws IOException {
final String filepath = "1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
private long position = 0;
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) {
final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
long currentPosition = position;
position+=byteBuf.readableBytes();
channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
@Override
public void completed(Integer result, ByteBuf attachment) {
attachment.release();
if(content.isLast()){
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
@Override
public void failed(Throwable exc, ByteBuf attachment) {
attachment.release();
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
return response;
}
});
}
第二种解决方案,因为我们没有等到一些字节写入文件,所以AsynchronousFileChannel
可以创建很多线程(如果使用Linux,因为Linux没有实现non-blocking异步文件IO。在Windows中情况好多了)。
正如我的测量显示,在写入慢速 USB 闪存的情况下,线程数可以达到数万个,因此为此您需要通过创建 ExecutorService
来限制线程数并将其转移到 AsynchronousFileChannel
.
第一种方案和第二种方案有明显的优缺点吗?我很难说。也许有人能告诉我什么更有效。