您如何在块到达时使用 java.net.http 读取和打印分块的 HTTP 响应?
How do you read and print a chunked HTTP response using java.net.http as chunks arrive?
Java 11 引入了一个新包,java.net.http
,用于发出 HTTP 请求。对于一般用法,它非常简单。
我的问题是:我如何使用 java.net.http
来处理客户端收到每个块时的分块响应?
java.http.net
包含一个反应式 BodySubscriber
,这似乎是我想要的,但我找不到它如何使用的示例。
http_get_demo.py
下面是一个 python 实现,当块到达时打印块,我想用 java.net.http 做同样的事情:
import argparse
import requests
def main(url: str):
with requests.get(url, stream=True) as r:
for c in r.iter_content(chunk_size=1):
print(c.decode("UTF-8"), end="")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Read from a URL and print as text as chunks arrive")
parser.add_argument('url', type=str, help="A URL to read from")
args = parser.parse_args()
main(args.url)
HttpGetDemo.java
为了完整起见,这里有一个使用 java.net.http 发出阻塞请求的简单示例:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
public class HttpGetDemo {
public static void main(String[] args) throws Exception {
var request = HttpRequest.newBuilder()
.uri(URI.create(args[0]))
.build();
var bodyHandler = HttpResponse.BodyHandlers
.ofString();
var client = HttpClient.newHttpClient();
var response = client.send(request, bodyHandler);
System.out.println(response.body());
}
}
HttpAsyncGetDemo.java
下面是发出 non-blocking/async 请求的示例:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
/**
* ReadChunked
*/
public class HttpAsyncGetDemo {
public static void main(String[] args) throws Exception {
var request = HttpRequest.newBuilder()
.uri(URI.create(args[0]))
.build();
var bodyHandler = HttpResponse.BodyHandlers
.ofString();
var client = HttpClient.newHttpClient();
client.sendAsync(request, bodyHandler)
.thenApply(HttpResponse::body)
.thenAccept(System.out::println)
.join();
}
}
您可以打印 ByteBuffer
,但不能保证 ByteBuffer
对应于一个块。块由堆栈处理。将为每个块推送一个 ByteBuffer
切片 - 但如果缓冲区中没有足够的 space 剩余,则将推送部分块。消费者看到的只是一个包含数据的 ByteBuffer
流。
因此,您可以做的是在 ByteBuffer
出现时打印它们,但您不能保证它们与服务器发送的每个块完全对应。
注意:如果您请求的 body 是基于文本的,那么您可以使用
BodyHandlers.fromLineSubscriber(Subscriber<? super String> subscriber)
自定义 Subscriber<String>
将打印每一行。
BodyHandlers.fromLineSubscriber
使用响应 headers 中指示的字符集执行将字节解码为字符的硬字,如果需要,缓冲字节直到它们可以被解码(如果 ByteBuffer 可能在编码序列的中间结束,则文本包含在多个字节上编码的字符),并在行边界处拆分它们。 Subscriber::onNext 方法将为文本中的每一行调用一次。有关详细信息,请参阅 https://download.java.net/java/early_access/jdk11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#fromLineSubscriber(java.util.concurrent.Flow.Subscriber)。
python 代码无法确保一次 HTTP chunk 一个响应正文数据可用。它只是向应用程序提供少量数据,从而减少了应用程序级别消耗的内存量(它可以在堆栈的较低位置进行缓冲)。 Java 11 HTTP 客户端支持通过流式处理程序之一进行流式传输,HttpResponse.BodyHandlers
:ofInputStream
、ofByteArrayConsumer
、asLines
等
或者编写您自己的处理程序/订阅者,如下所示:
https://www.youtube.com/watch?v=qiaC0QMLz5Y
感谢@pavel 和@chegar999 的部分回答。他们引导我找到了解决方案。
概览
我想出的解决方案如下。基本上,解决方案是使用自定义 java.net.http.HttpResponse.BodySubscriber
。 BodySubscriber 包含响应式方法(onSubscribe、onNext、onError 和 onComplete)和一个基本上 returns a java CompletableFuture 的 getBody 方法,最终将生成 HTTP 请求的主体。一旦你有了 BodySubscriber,你就可以像这样使用它:
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, responseInfo -> new StringSubscriber())
.whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
.thenApply(HttpResponse::body);
注意这一行:
client.sendAsync(request, responseInfo -> new StringSubscriber())
这是我们注册自定义 BodySubscriber 的地方;在这种情况下,我的自定义 class 被命名为 StringSubscriber
.
CustomSubscriber.java
这是一个完整的工作示例。使用 Java 11,您可以 运行 它而无需编译。只需将其传递到名为 CustomSubscriber.java
的文件中,然后 运行 命令 java CustomSubscriber <some url>
。它在到达时打印每个块的内容。当响应完成时,它还会收集它们并 returns 它们作为正文。
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;
public class CustomSubscriber {
public static void main(String[] args) {
CustomSubscriber cs = new CustomSubscriber();
String body = cs.get(args[0]).join();
System.out.println("--- Response body:\n: ..." + body + "...");
}
public CompletableFuture<String> get(String uri) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, responseInfo -> new StringSubscriber())
.whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
.thenApply(HttpResponse::body);
}
static class StringSubscriber implements BodySubscriber<String> {
final CompletableFuture<String> bodyCF = new CompletableFuture<>();
Flow.Subscription subscription;
List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();
@Override
public CompletionStage<String> getBody() {
return bodyCF;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Request first item
}
@Override
public void onNext(List<ByteBuffer> buffers) {
System.out.println("-- onNext " + buffers);
try {
System.out.println("\tBuffer Content:\n" + asString(buffers));
}
catch (Exception e) {
System.out.println("\tUnable to print buffer content");
}
buffers.forEach(ByteBuffer::rewind); // Rewind after reading
responseData.addAll(buffers);
subscription.request(1); // Request next item
}
@Override
public void onError(Throwable throwable) {
bodyCF.completeExceptionally(throwable);
}
@Override
public void onComplete() {
bodyCF.complete(asString(responseData));
}
private String asString(List<ByteBuffer> buffers) {
return new String(toBytes(buffers), StandardCharsets.UTF_8);
}
private byte[] toBytes(List<ByteBuffer> buffers) {
int size = buffers.stream()
.mapToInt(ByteBuffer::remaining)
.sum();
byte[] bs = new byte[size];
int offset = 0;
for (ByteBuffer buffer : buffers) {
int remaining = buffer.remaining();
buffer.get(bs, offset, remaining);
offset += remaining;
}
return bs;
}
}
}
尝试一下
要测试此解决方案,您需要一个服务器来发送使用 Transfer-encoding: chunked
的响应,并且发送速度足够慢以观察数据块的到达。我在 https://github.com/hohonuuli/demo-chunk-server 创建了一个,但你可以像这样使用 Docker 旋转它:
docker run -p 8080:8080 hohonuuli/demo-chunk-server
然后 运行 CustomSubscriber.java 代码使用 java CustomSubscriber.java http://localhost:8080/chunk/10
现在有一个新的 Java 库可以满足此类要求
RxSON:https://github.com/rxson/rxson
它利用 JsonPath 和 RxJava 在响应到达后立即从响应中读取 JSON 流块,并将它们解析为 java 个对象。
示例:
String serviceURL = "https://think.cs.vt.edu/corgis/datasets/json/airlines/airlines.json";
HttpRequest req = HttpRequest.newBuilder(URI.create(serviceURL)).GET().build();
RxSON rxson = new RxSON.Builder().build();
String jsonPath = "$[*].Airport.Name";
Flowable<String> airportStream = rxson.create(String.class, req, jsonPath);
airportStream
.doOnNext(it -> System.out.println("Received new item: " + it))
//Just for test
.toList()
.blockingGet();
Java 11 引入了一个新包,java.net.http
,用于发出 HTTP 请求。对于一般用法,它非常简单。
我的问题是:我如何使用 java.net.http
来处理客户端收到每个块时的分块响应?
java.http.net
包含一个反应式 BodySubscriber
,这似乎是我想要的,但我找不到它如何使用的示例。
http_get_demo.py
下面是一个 python 实现,当块到达时打印块,我想用 java.net.http 做同样的事情:
import argparse
import requests
def main(url: str):
with requests.get(url, stream=True) as r:
for c in r.iter_content(chunk_size=1):
print(c.decode("UTF-8"), end="")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Read from a URL and print as text as chunks arrive")
parser.add_argument('url', type=str, help="A URL to read from")
args = parser.parse_args()
main(args.url)
HttpGetDemo.java
为了完整起见,这里有一个使用 java.net.http 发出阻塞请求的简单示例:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
public class HttpGetDemo {
public static void main(String[] args) throws Exception {
var request = HttpRequest.newBuilder()
.uri(URI.create(args[0]))
.build();
var bodyHandler = HttpResponse.BodyHandlers
.ofString();
var client = HttpClient.newHttpClient();
var response = client.send(request, bodyHandler);
System.out.println(response.body());
}
}
HttpAsyncGetDemo.java
下面是发出 non-blocking/async 请求的示例:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;
/**
* ReadChunked
*/
public class HttpAsyncGetDemo {
public static void main(String[] args) throws Exception {
var request = HttpRequest.newBuilder()
.uri(URI.create(args[0]))
.build();
var bodyHandler = HttpResponse.BodyHandlers
.ofString();
var client = HttpClient.newHttpClient();
client.sendAsync(request, bodyHandler)
.thenApply(HttpResponse::body)
.thenAccept(System.out::println)
.join();
}
}
您可以打印 ByteBuffer
,但不能保证 ByteBuffer
对应于一个块。块由堆栈处理。将为每个块推送一个 ByteBuffer
切片 - 但如果缓冲区中没有足够的 space 剩余,则将推送部分块。消费者看到的只是一个包含数据的 ByteBuffer
流。
因此,您可以做的是在 ByteBuffer
出现时打印它们,但您不能保证它们与服务器发送的每个块完全对应。
注意:如果您请求的 body 是基于文本的,那么您可以使用
BodyHandlers.fromLineSubscriber(Subscriber<? super String> subscriber)
自定义 Subscriber<String>
将打印每一行。
BodyHandlers.fromLineSubscriber
使用响应 headers 中指示的字符集执行将字节解码为字符的硬字,如果需要,缓冲字节直到它们可以被解码(如果 ByteBuffer 可能在编码序列的中间结束,则文本包含在多个字节上编码的字符),并在行边界处拆分它们。 Subscriber::onNext 方法将为文本中的每一行调用一次。有关详细信息,请参阅 https://download.java.net/java/early_access/jdk11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#fromLineSubscriber(java.util.concurrent.Flow.Subscriber)。
python 代码无法确保一次 HTTP chunk 一个响应正文数据可用。它只是向应用程序提供少量数据,从而减少了应用程序级别消耗的内存量(它可以在堆栈的较低位置进行缓冲)。 Java 11 HTTP 客户端支持通过流式处理程序之一进行流式传输,HttpResponse.BodyHandlers
:ofInputStream
、ofByteArrayConsumer
、asLines
等
或者编写您自己的处理程序/订阅者,如下所示: https://www.youtube.com/watch?v=qiaC0QMLz5Y
感谢@pavel 和@chegar999 的部分回答。他们引导我找到了解决方案。
概览
我想出的解决方案如下。基本上,解决方案是使用自定义 java.net.http.HttpResponse.BodySubscriber
。 BodySubscriber 包含响应式方法(onSubscribe、onNext、onError 和 onComplete)和一个基本上 returns a java CompletableFuture 的 getBody 方法,最终将生成 HTTP 请求的主体。一旦你有了 BodySubscriber,你就可以像这样使用它:
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, responseInfo -> new StringSubscriber())
.whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
.thenApply(HttpResponse::body);
注意这一行:
client.sendAsync(request, responseInfo -> new StringSubscriber())
这是我们注册自定义 BodySubscriber 的地方;在这种情况下,我的自定义 class 被命名为 StringSubscriber
.
CustomSubscriber.java
这是一个完整的工作示例。使用 Java 11,您可以 运行 它而无需编译。只需将其传递到名为 CustomSubscriber.java
的文件中,然后 运行 命令 java CustomSubscriber <some url>
。它在到达时打印每个块的内容。当响应完成时,它还会收集它们并 returns 它们作为正文。
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;
public class CustomSubscriber {
public static void main(String[] args) {
CustomSubscriber cs = new CustomSubscriber();
String body = cs.get(args[0]).join();
System.out.println("--- Response body:\n: ..." + body + "...");
}
public CompletableFuture<String> get(String uri) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.build();
return client.sendAsync(request, responseInfo -> new StringSubscriber())
.whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
.thenApply(HttpResponse::body);
}
static class StringSubscriber implements BodySubscriber<String> {
final CompletableFuture<String> bodyCF = new CompletableFuture<>();
Flow.Subscription subscription;
List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();
@Override
public CompletionStage<String> getBody() {
return bodyCF;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Request first item
}
@Override
public void onNext(List<ByteBuffer> buffers) {
System.out.println("-- onNext " + buffers);
try {
System.out.println("\tBuffer Content:\n" + asString(buffers));
}
catch (Exception e) {
System.out.println("\tUnable to print buffer content");
}
buffers.forEach(ByteBuffer::rewind); // Rewind after reading
responseData.addAll(buffers);
subscription.request(1); // Request next item
}
@Override
public void onError(Throwable throwable) {
bodyCF.completeExceptionally(throwable);
}
@Override
public void onComplete() {
bodyCF.complete(asString(responseData));
}
private String asString(List<ByteBuffer> buffers) {
return new String(toBytes(buffers), StandardCharsets.UTF_8);
}
private byte[] toBytes(List<ByteBuffer> buffers) {
int size = buffers.stream()
.mapToInt(ByteBuffer::remaining)
.sum();
byte[] bs = new byte[size];
int offset = 0;
for (ByteBuffer buffer : buffers) {
int remaining = buffer.remaining();
buffer.get(bs, offset, remaining);
offset += remaining;
}
return bs;
}
}
}
尝试一下
要测试此解决方案,您需要一个服务器来发送使用 Transfer-encoding: chunked
的响应,并且发送速度足够慢以观察数据块的到达。我在 https://github.com/hohonuuli/demo-chunk-server 创建了一个,但你可以像这样使用 Docker 旋转它:
docker run -p 8080:8080 hohonuuli/demo-chunk-server
然后 运行 CustomSubscriber.java 代码使用 java CustomSubscriber.java http://localhost:8080/chunk/10
现在有一个新的 Java 库可以满足此类要求 RxSON:https://github.com/rxson/rxson 它利用 JsonPath 和 RxJava 在响应到达后立即从响应中读取 JSON 流块,并将它们解析为 java 个对象。
示例:
String serviceURL = "https://think.cs.vt.edu/corgis/datasets/json/airlines/airlines.json";
HttpRequest req = HttpRequest.newBuilder(URI.create(serviceURL)).GET().build();
RxSON rxson = new RxSON.Builder().build();
String jsonPath = "$[*].Airport.Name";
Flowable<String> airportStream = rxson.create(String.class, req, jsonPath);
airportStream
.doOnNext(it -> System.out.println("Received new item: " + it))
//Just for test
.toList()
.blockingGet();