Spring WebClient:如何将大字节 [] 流式传输到文件?
Spring WebClient: How to stream large byte[] to file?
似乎 Spring RestTemplate
无法将响应直接流式传输到文件而不将其全部缓冲在内存中。使用较新的 Spring 5 WebClient
实现此目的的正确方法是什么?
WebClient client = WebClient.create("https://example.com");
client.get().uri(".../{name}", name).accept(MediaType.APPLICATION_OCTET_STREAM)
....?
我看到人们用 RestTemplate
找到了这个问题的一些 workarounds/hacks,但我更感兴趣的是用 WebClient
.[=18= 的正确方法来解决这个问题]
有很多使用 RestTemplate
下载二进制数据的示例,但几乎所有示例都将 byte[]
加载到内存中。
我无法测试以下代码是否有效地不在内存中缓冲 webClient
有效负载的内容。不过,我认为你应该从那里开始:
public Mono<Void> testWebClientStreaming() throws IOException {
Flux<DataBuffer> stream =
webClient
.get().accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.bodyToFlux(DataBuffer.class);
Path filePath = Paths.get("filename");
AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(filePath, WRITE);
return DataBufferUtils.write(stream, asynchronousFileChannel)
.doOnNext(DataBufferUtils.releaseConsumer())
.doAfterTerminate(() -> {
try {
asynchronousFileChannel.close();
} catch (IOException ignored) { }
}).then();
}
我不确定您当前使用的 spring 是否可以访问 RestTemplate
,但这对我有用。
RestTemplate restTemplate // = ...;
RequestCallback requestCallback = request -> request.getHeaders()
.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));
// Streams the response
ResponseExtractor<Void> responseExtractor = response -> {
// Here I write the response to a file but do what you like
Path path = Paths.get("http://some/path");
Files.copy(response.getBody(), path);
return null;
};
restTemplate.execute(URI.create("www.something.com"), HttpMethod.GET, requestCallback, responseExtractor);
将 body 存储到临时文件并使用
static <R> Mono<R> writeBodyToTempFileAndApply(
final WebClient.ResponseSpec spec,
final Function<? super Path, ? extends R> function) {
return using(
() -> createTempFile(null, null),
t -> write(spec.bodyToFlux(DataBuffer.class), t)
.thenReturn(function.apply(t)),
t -> {
try {
deleteIfExists(t);
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
}
);
}
输送 body 并消耗
static <R> Mono<R> pipeBodyAndApply(
final WebClient.ResponseSpec spec, final ExecutorService executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(
Pipe::open,
p -> {
final Future<Disposable> future = executor.submit(
() -> write(spec.bodyToFlux(DataBuffer.class), p.sink())
.log()
.doFinally(s -> {
try {
p.sink().close();
log.debug("p.sink closed");
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
})
.subscribe(DataBufferUtils.releaseConsumer())
);
return just(function.apply(p.source()))
.log()
.doFinally(s -> {
try {
final Disposable disposable = future.get();
assert disposable.isDisposed();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
},
p -> {
try {
p.source().close();
log.debug("p.source closed");
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
}
);
}
最近稳定的 Spring WebFlux(5.2.4.RELEASE 撰写时):
final WebClient client = WebClient.create("https://example.com");
final Flux<DataBuffer> dataBufferFlux = client.get()
.accept(MediaType.TEXT_HTML)
.retrieve()
.bodyToFlux(DataBuffer.class); // the magic happens here
final Path path = FileSystems.getDefault().getPath("target/example.html");
DataBufferUtils
.write(dataBufferFlux, path, CREATE_NEW)
.block(); // only block here if the rest of your code is synchronous
对我来说,不明显的部分是 bodyToFlux(DataBuffer.class)
,因为目前在 Spring 文档的 generic section about streaming 中提到了它,在WebClient 部分。
似乎 Spring RestTemplate
无法将响应直接流式传输到文件而不将其全部缓冲在内存中。使用较新的 Spring 5 WebClient
实现此目的的正确方法是什么?
WebClient client = WebClient.create("https://example.com");
client.get().uri(".../{name}", name).accept(MediaType.APPLICATION_OCTET_STREAM)
....?
我看到人们用 RestTemplate
找到了这个问题的一些 workarounds/hacks,但我更感兴趣的是用 WebClient
.[=18= 的正确方法来解决这个问题]
有很多使用 RestTemplate
下载二进制数据的示例,但几乎所有示例都将 byte[]
加载到内存中。
我无法测试以下代码是否有效地不在内存中缓冲 webClient
有效负载的内容。不过,我认为你应该从那里开始:
public Mono<Void> testWebClientStreaming() throws IOException {
Flux<DataBuffer> stream =
webClient
.get().accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.bodyToFlux(DataBuffer.class);
Path filePath = Paths.get("filename");
AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(filePath, WRITE);
return DataBufferUtils.write(stream, asynchronousFileChannel)
.doOnNext(DataBufferUtils.releaseConsumer())
.doAfterTerminate(() -> {
try {
asynchronousFileChannel.close();
} catch (IOException ignored) { }
}).then();
}
我不确定您当前使用的 spring 是否可以访问 RestTemplate
,但这对我有用。
RestTemplate restTemplate // = ...;
RequestCallback requestCallback = request -> request.getHeaders()
.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));
// Streams the response
ResponseExtractor<Void> responseExtractor = response -> {
// Here I write the response to a file but do what you like
Path path = Paths.get("http://some/path");
Files.copy(response.getBody(), path);
return null;
};
restTemplate.execute(URI.create("www.something.com"), HttpMethod.GET, requestCallback, responseExtractor);
将 body 存储到临时文件并使用
static <R> Mono<R> writeBodyToTempFileAndApply(
final WebClient.ResponseSpec spec,
final Function<? super Path, ? extends R> function) {
return using(
() -> createTempFile(null, null),
t -> write(spec.bodyToFlux(DataBuffer.class), t)
.thenReturn(function.apply(t)),
t -> {
try {
deleteIfExists(t);
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
}
);
}
输送 body 并消耗
static <R> Mono<R> pipeBodyAndApply(
final WebClient.ResponseSpec spec, final ExecutorService executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(
Pipe::open,
p -> {
final Future<Disposable> future = executor.submit(
() -> write(spec.bodyToFlux(DataBuffer.class), p.sink())
.log()
.doFinally(s -> {
try {
p.sink().close();
log.debug("p.sink closed");
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
})
.subscribe(DataBufferUtils.releaseConsumer())
);
return just(function.apply(p.source()))
.log()
.doFinally(s -> {
try {
final Disposable disposable = future.get();
assert disposable.isDisposed();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
},
p -> {
try {
p.source().close();
log.debug("p.source closed");
} catch (final IOException ioe) {
throw new RuntimeException(ioe);
}
}
);
}
最近稳定的 Spring WebFlux(5.2.4.RELEASE 撰写时):
final WebClient client = WebClient.create("https://example.com");
final Flux<DataBuffer> dataBufferFlux = client.get()
.accept(MediaType.TEXT_HTML)
.retrieve()
.bodyToFlux(DataBuffer.class); // the magic happens here
final Path path = FileSystems.getDefault().getPath("target/example.html");
DataBufferUtils
.write(dataBufferFlux, path, CREATE_NEW)
.block(); // only block here if the rest of your code is synchronous
对我来说,不明显的部分是 bodyToFlux(DataBuffer.class)
,因为目前在 Spring 文档的 generic section about streaming 中提到了它,在WebClient 部分。