使用 Spring Webflux 恢复文件下载,并在 Spring 中提供静态文件
Resume file downloading with Spring Webflux and static files serving in Spring
我相信整个互联网上都没有人回答这个问题,因为它可能非常复杂,但我会继续问下去。
基本上,我想在多个 Spring 应用程序之间进行交叉通信。他们每个人都以静态方式提供资源,here is the link for that topic. This serving is capitalized by other application instances that can download those files on request (for now I am transferring files via HTTP). I was able to download files thanks to the Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor 所以问题。
现在我想提升我的代码,以便在出现连接问题或应用程序在给定超时时间内暂时不可用时,我能够恢复 文件下载.我将 WebClient
超时配置为 this article.
现在,我认为这样的代码实际上可以让我处理暂时不可用的服务:
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileData = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
)
.flatMapMany(Function.identity());
DataBufferUtils
.write(fileData , fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.doOnError(e -> !(e instanceof ChannelException), e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnError(ChannelException.class, e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnTerminate(() -> {
try {
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
})
.blockLast();
但显然,每当我终止以以下开头的第二个应用程序实例时,我都会收到一大堆错误:
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
2019-10-25T15:41:53.602+0200 [ERROR] [xxx] [N/A:N/A] [r.core.publisher.Operators] { thread=reactor-http-nio-4 } Operator called default onErrorDropped
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at reactor.core.Exceptions.bubble(Exceptions.java:154)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
以及稍后在同一堆栈跟踪中:
2019-10-25T15:41:53.602+0200 [WARN] [xxx] [N/A:N/A] [i.n.c.AbstractChannelHandlerContext] { thread=reactor-http-nio-4 } An exception 'reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at reactor.core.Exceptions.bubble(Exceptions.java:154)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
异常本身并不是什么大问题,但关键是我的应用程序重新启动后我的下载没有恢复。
所以,是的,我的问题是,我怎样才能恢复下载以及should/could我如何处理此处的异常情况?
您好,以下是我找到的实现 Spring-webflux-file-upload-download
基于它,下面是我的文件下载代码:
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.io.File;
import java.io.IOException;
@RestController
@RequestMapping("/")
public class DownloadContorller {
@GetMapping
public Mono<Void> downloadByWriteWith(ServerHttpResponse response) throws IOException {
ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=large-file.so");
response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);
Resource resource = new ClassPathResource("large-file.so");
File file = resource.getFile();
return zeroCopyResponse.writeWith(file, 0, file.length());
}
}
我已经使用 wget -c http://localhost:8080/
对其进行了测试,我能够从中断的地方恢复 下载。我通过
测试了它
- 在终端上从客户端 (Ctrl+C) 中断
- 正在停止应用程序然后重新启动它。
希望对您有所帮助。
代码有 2 个问题。首先是确定文件大小的方法。 AtomicLong
的正确使用方法与 DataBufferUtils.write()
相同。第二个问题是远程调用之间的 retry()
。在处理具有固定退避的远程调用时,使用 retryWhen()
似乎是一种很好的方式。总结一下,这是使我能够在 connection/application 问题
的情况下从正确的字节恢复文件下载的代码
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
AtomicLong fileSize = new AtomicLong(targetPath.toFile().length());
Flux<DataBuffer> fileDataStream = webClient
.get()
.uri(remoteXFerServiceTargetHost)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", fileSize.get()))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class);
DataBufferUtils
.write(fileData , fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.retryWhen(Retry.any().fixedBackoff(Duration.ofSeconds(5)).retryMax(5))
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.doOnError(e -> !(e instanceof ChannelException), e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnError(ChannelException.class, e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnTerminate(() -> {
try {
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
})
.blockLast();
我相信整个互联网上都没有人回答这个问题,因为它可能非常复杂,但我会继续问下去。
基本上,我想在多个 Spring 应用程序之间进行交叉通信。他们每个人都以静态方式提供资源,here is the link for that topic. This serving is capitalized by other application instances that can download those files on request (for now I am transferring files via HTTP). I was able to download files thanks to the Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor 所以问题。
现在我想提升我的代码,以便在出现连接问题或应用程序在给定超时时间内暂时不可用时,我能够恢复 文件下载.我将 WebClient
超时配置为 this article.
现在,我认为这样的代码实际上可以让我处理暂时不可用的服务:
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileData = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
)
.flatMapMany(Function.identity());
DataBufferUtils
.write(fileData , fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.doOnError(e -> !(e instanceof ChannelException), e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnError(ChannelException.class, e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnTerminate(() -> {
try {
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
})
.blockLast();
但显然,每当我终止以以下开头的第二个应用程序实例时,我都会收到一大堆错误:
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
2019-10-25T15:41:53.602+0200 [ERROR] [xxx] [N/A:N/A] [r.core.publisher.Operators] { thread=reactor-http-nio-4 } Operator called default onErrorDropped
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at reactor.core.Exceptions.bubble(Exceptions.java:154)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
以及稍后在同一堆栈跟踪中:
2019-10-25T15:41:53.602+0200 [WARN] [xxx] [N/A:N/A] [i.n.c.AbstractChannelHandlerContext] { thread=reactor-http-nio-4 } An exception 'reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at reactor.core.Exceptions.bubble(Exceptions.java:154)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
异常本身并不是什么大问题,但关键是我的应用程序重新启动后我的下载没有恢复。
所以,是的,我的问题是,我怎样才能恢复下载以及should/could我如何处理此处的异常情况?
您好,以下是我找到的实现 Spring-webflux-file-upload-download
基于它,下面是我的文件下载代码:
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.io.File;
import java.io.IOException;
@RestController
@RequestMapping("/")
public class DownloadContorller {
@GetMapping
public Mono<Void> downloadByWriteWith(ServerHttpResponse response) throws IOException {
ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;
response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=large-file.so");
response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);
Resource resource = new ClassPathResource("large-file.so");
File file = resource.getFile();
return zeroCopyResponse.writeWith(file, 0, file.length());
}
}
我已经使用 wget -c http://localhost:8080/
对其进行了测试,我能够从中断的地方恢复 下载。我通过
- 在终端上从客户端 (Ctrl+C) 中断
- 正在停止应用程序然后重新启动它。
希望对您有所帮助。
代码有 2 个问题。首先是确定文件大小的方法。 AtomicLong
的正确使用方法与 DataBufferUtils.write()
相同。第二个问题是远程调用之间的 retry()
。在处理具有固定退避的远程调用时,使用 retryWhen()
似乎是一种很好的方式。总结一下,这是使我能够在 connection/application 问题
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
AtomicLong fileSize = new AtomicLong(targetPath.toFile().length());
Flux<DataBuffer> fileDataStream = webClient
.get()
.uri(remoteXFerServiceTargetHost)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", fileSize.get()))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class);
DataBufferUtils
.write(fileData , fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.retryWhen(Retry.any().fixedBackoff(Duration.ofSeconds(5)).retryMax(5))
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.doOnError(e -> !(e instanceof ChannelException), e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnError(ChannelException.class, e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnTerminate(() -> {
try {
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
})
.blockLast();