使用 Java 11 HttpClient 读取分块数据
Using Java 11 HttpClient to read chunked data
我正在尝试使用 Java 11 的 java.net.http.HttpClient 从 Http 响应中读取分块数据,但我一次只能读取一行。我需要一次获取整个块。
这是我的代码:
final InputStream eventStream;
try {
HttpResponse<InputStream> httpResponse = httpClient.send(HttpRequest
.newBuilder(
new URI(this.config.getEnvironmentAccess().getUrl() + ":<port>/status/?pretty=true"))
.GET().build(), BodyHandlers.ofInputStream());
LOGGER.info("event stream HttpResponse received");
LOGGER.info("statusCode: {}", httpResponse.statusCode());
LOGGER.info("headers: {}", httpResponse.headers());
LOGGER.info("version: {}", httpResponse.version());
LOGGER.info("request: {}", httpResponse.request());
eventStream = httpResponse.body();
} catch (IOException | InterruptedException | URISyntaxException e) {
throw new RuntimeException("Unable to get status event stream", e);
}
BufferedReader br = new BufferedReader(new InputStreamReader(eventStream));
String line = "";
try {
while ((line = br.readLine()) != null) {
LOGGER.info("readLine(): {}", line);
}
} catch (IOException e) {
throw new RuntimeException("Unable to read status event stream", e);
}
但我收到的回复是单行的:
824 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:352 - starting to listen for thing script responses . . .
2018-10-15 20:43:34,057-0400 9107 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:362 - Thing event stream HttpResponse received
2018-10-15 20:43:34,058-0400 9108 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:363 - statusCode: 200
2018-10-15 20:43:34,059-0400 9109 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:364 - headers: java.net.http.HttpHeaders@a6764984 { {content-type=[application/json], date=[Tue, 16 Oct 2018 00:43:34 GMT], server=[Jetty(9.2.z-SNAPSHOT)], transfer-encoding=[chunked]} }
2018-10-15 20:43:34,060-0400 9110 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:365 - version: HTTP_1_1
2018-10-15 20:43:34,060-0400 9110 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:366 - request: http://thing-url.company.com:<port>/status/?pretty=true GET
2018-10-15 20:43:34,063-0400 9113 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): {
2018-10-15 20:43:34,063-0400 9113 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Header" : {
2018-10-15 20:43:34,064-0400 9114 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "name" : "header",
2018-10-15 20:43:34,066-0400 9116 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "value" : {
2018-10-15 20:43:34,067-0400 9117 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "version" : 1.2
2018-10-15 20:43:34,067-0400 9117 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,068-0400 9118 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,100-0400 9150 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #1 of 10
2018-10-15 20:43:34,111-0400 9161 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:19.054Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0","service":"dataManagerService.getData","requestNumber":177}
2018-10-15 20:43:34,188-0400 9238 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #2 of 10
2018-10-15 20:43:34,191-0400 9241 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:23.157Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0ddeebddaa854c6ab2ac33b911af28aa","service":"dataManagerService.getData","requestNumber":180}
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Group" : {
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "name" : "groupStart",
2018-10-15 20:43:34,234-0400 9284 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "value" : {
2018-10-15 20:43:34,234-0400 9284 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "groupId" : "57b94dcc-fad8-40f2-bb86-6d6894f26f26",
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestInfo" : [ {
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "sessionId" : "5fa9bc47fd5d450da9323b5d35b14e89",
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestId" : "25ed5909e04d4caba4d57c41dd85dea0"
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } ]
2018-10-15 20:43:34,236-0400 9286 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,236-0400 9286 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,245-0400 9295 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #3 of 10
2018-10-15 20:43:34,246-0400 9296 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:27.362Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0b8fa91c6b20478589d66f90af80f481","service":"dataManagerService.getData","requestNumber":182}
2018-10-15 20:43:34,246-0400 9296 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Event" : {
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "name" : "jobStart",
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "value" : {
2018-10-15 20:43:34,248-0400 9298 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "stats" : {
2018-10-15 20:43:34,249-0400 9299 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "totalTasks" : 1,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "runningTasks" : 0,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "completeTasks" : 0,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "failedTasks" : 0
2018-10-15 20:43:34,251-0400 9301 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): },
2018-10-15 20:43:34,251-0400 9301 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "coreMsUsed" : -1,
2018-10-15 20:43:34,252-0400 9302 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestInfo" : [ {
2018-10-15 20:43:34,252-0400 9302 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "sessionId" : "5fa9bc47fd5d450da9323b5d35b14e89",
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestId" : "25ed5909e04d4caba4d57c41dd85dea0"
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } ]
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Event" : {
我意识到那是因为我正在使用 BufferedReader 并告诉它在我得到它时记录每一行,但我不知道如何将这些行作为块读取。
谁能帮我弄清楚如何以原子方式读取块,以便我可以将它们解析为 JSON,一次一个事件?
解决方案
必须使用 Json处理器来解析到达的 Json 流:
private void listenForEvents(final IMakeHttpRequests httpClient) {
assert httpClient != null : "httpClient cannot be null";
LOGGER.info("starting to listen for script responses . . .");
final InputStream eventStream;
final JsonParser parser;
try {
HttpResponse<InputStream> httpResponse = httpClient.send(HttpRequest
.newBuilder(
new URI(this.config.getEnvironmentAccess().getUrl() + ":<port>/status/?pretty=true"))
.GET().build(), BodyHandlers.ofInputStream());
eventStream = httpResponse.body();
} catch (IOException | InterruptedException | URISyntaxException e) {
throw new RuntimeException("Unable to get status event stream", e);
}
try {
parser = JsonUtils.getObjectMapper().getFactory().createParser(eventStream);
while (parser.nextToken() != null) {
final TreeNode tree = parser.readValueAsTree();
LOGGER.info("tree: {}", tree);
}
} catch (IOException e) {
throw new RuntimeException("Unable to parse event stream to Json", e);
}
}
2018-10-16 13:25:32,081-0400 12622 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:352 - starting to listen for script responses . . .
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:363 - Thing event stream HttpResponse received
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:364 - statusCode: 200
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:365 - headers: java.net.http.HttpHeaders@67b87c21 { {content-type=[application/json], date=[Tue, 16 Oct 2018 17:25:33 GMT], server=[Jetty(9.2.z-SNAPSHOT)], transfer-encoding=[chunked]} }
2018-10-16 13:25:32,809-0400 13350 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:366 - version: HTTP_1_1
2018-10-16 13:25:32,809-0400 13350 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:367 - request: http://<server>.company.com:<port>/status/?pretty=true GET
2018-10-16 13:25:32,819-0400 13360 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Header":{"name":"header","value":{"version":1.2}}}
2018-10-16 13:25:32,835-0400 13376 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #1 of 10
2018-10-16 13:25:32,843-0400 13384 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:19.054Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0","service":"dataManagerService.getData","requestNumber":177}
2018-10-16 13:25:33,074-0400 13615 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #2 of 10
2018-10-16 13:25:33,075-0400 13616 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:23.157Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0ddeebddaa854c6ab2ac33b911af28aa","service":"dataManagerService.getData","requestNumber":180}
2018-10-16 13:25:33,117-0400 13658 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #3 of 10
2018-10-16 13:25:33,117-0400 13658 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:27.362Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0b8fa91c6b20478589d66f90af80f481","service":"dataManagerService.getData","requestNumber":182}
2018-10-16 13:25:33,237-0400 13778 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Group":{"name":"groupStart","value":{"groupId":"b205e673-2eb1-4352-b207-fd4917be292a","requestInfo":[{"sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0"}]}}}
2018-10-16 13:25:33,238-0400 13779 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Event":{"name":"jobStart","value":{"stats":{"totalTasks":1,"runningTasks":0,"completeTasks":0,"failedTasks":0},"coreMsUsed":-1,"requestInfo":[{"sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0"}]}}}
. . .
您可以使用 StringBuilder 来获得您想要的结果。
try {
StringBuilder sb = new StringBuilder();
while ((line = br.readLine()) != null) {
sb.append(line);
}
LOGGER.info("readLine(): {}", sb.toString());
} catch (IOException e) {
throw new RuntimeException("Unable to read status event stream", e);
}
似乎您正在逐行读取流,而不管块是如何传入的(BufferedReader 负责将数据重新构建为行)。但是,没有人保证单个块包含单个 json 记录。服务器可以根据其内部缓冲区对数据进行分块。
解析流时,您可以看看 Jackson Streaming API。编程模型有点复杂,但它可能适合您的需要。例子
https://www.baeldung.com/jackson-streaming-api or the default jre implementation https://docs.oracle.com/javaee/7/api/javax/json/stream/package-summary.html
我正在尝试使用 Java 11 的 java.net.http.HttpClient 从 Http 响应中读取分块数据,但我一次只能读取一行。我需要一次获取整个块。
这是我的代码:
final InputStream eventStream;
try {
HttpResponse<InputStream> httpResponse = httpClient.send(HttpRequest
.newBuilder(
new URI(this.config.getEnvironmentAccess().getUrl() + ":<port>/status/?pretty=true"))
.GET().build(), BodyHandlers.ofInputStream());
LOGGER.info("event stream HttpResponse received");
LOGGER.info("statusCode: {}", httpResponse.statusCode());
LOGGER.info("headers: {}", httpResponse.headers());
LOGGER.info("version: {}", httpResponse.version());
LOGGER.info("request: {}", httpResponse.request());
eventStream = httpResponse.body();
} catch (IOException | InterruptedException | URISyntaxException e) {
throw new RuntimeException("Unable to get status event stream", e);
}
BufferedReader br = new BufferedReader(new InputStreamReader(eventStream));
String line = "";
try {
while ((line = br.readLine()) != null) {
LOGGER.info("readLine(): {}", line);
}
} catch (IOException e) {
throw new RuntimeException("Unable to read status event stream", e);
}
但我收到的回复是单行的:
824 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:352 - starting to listen for thing script responses . . .
2018-10-15 20:43:34,057-0400 9107 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:362 - Thing event stream HttpResponse received
2018-10-15 20:43:34,058-0400 9108 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:363 - statusCode: 200
2018-10-15 20:43:34,059-0400 9109 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:364 - headers: java.net.http.HttpHeaders@a6764984 { {content-type=[application/json], date=[Tue, 16 Oct 2018 00:43:34 GMT], server=[Jetty(9.2.z-SNAPSHOT)], transfer-encoding=[chunked]} }
2018-10-15 20:43:34,060-0400 9110 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:365 - version: HTTP_1_1
2018-10-15 20:43:34,060-0400 9110 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:366 - request: http://thing-url.company.com:<port>/status/?pretty=true GET
2018-10-15 20:43:34,063-0400 9113 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): {
2018-10-15 20:43:34,063-0400 9113 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Header" : {
2018-10-15 20:43:34,064-0400 9114 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "name" : "header",
2018-10-15 20:43:34,066-0400 9116 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "value" : {
2018-10-15 20:43:34,067-0400 9117 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "version" : 1.2
2018-10-15 20:43:34,067-0400 9117 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,068-0400 9118 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,100-0400 9150 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #1 of 10
2018-10-15 20:43:34,111-0400 9161 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:19.054Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0","service":"dataManagerService.getData","requestNumber":177}
2018-10-15 20:43:34,188-0400 9238 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #2 of 10
2018-10-15 20:43:34,191-0400 9241 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:23.157Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0ddeebddaa854c6ab2ac33b911af28aa","service":"dataManagerService.getData","requestNumber":180}
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Group" : {
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "name" : "groupStart",
2018-10-15 20:43:34,234-0400 9284 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "value" : {
2018-10-15 20:43:34,234-0400 9284 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "groupId" : "57b94dcc-fad8-40f2-bb86-6d6894f26f26",
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestInfo" : [ {
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "sessionId" : "5fa9bc47fd5d450da9323b5d35b14e89",
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestId" : "25ed5909e04d4caba4d57c41dd85dea0"
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } ]
2018-10-15 20:43:34,236-0400 9286 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,236-0400 9286 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,245-0400 9295 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #3 of 10
2018-10-15 20:43:34,246-0400 9296 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:27.362Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0b8fa91c6b20478589d66f90af80f481","service":"dataManagerService.getData","requestNumber":182}
2018-10-15 20:43:34,246-0400 9296 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Event" : {
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "name" : "jobStart",
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "value" : {
2018-10-15 20:43:34,248-0400 9298 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "stats" : {
2018-10-15 20:43:34,249-0400 9299 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "totalTasks" : 1,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "runningTasks" : 0,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "completeTasks" : 0,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "failedTasks" : 0
2018-10-15 20:43:34,251-0400 9301 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): },
2018-10-15 20:43:34,251-0400 9301 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "coreMsUsed" : -1,
2018-10-15 20:43:34,252-0400 9302 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestInfo" : [ {
2018-10-15 20:43:34,252-0400 9302 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "sessionId" : "5fa9bc47fd5d450da9323b5d35b14e89",
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "requestId" : "25ed5909e04d4caba4d57c41dd85dea0"
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } ]
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): }
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): "Event" : {
我意识到那是因为我正在使用 BufferedReader 并告诉它在我得到它时记录每一行,但我不知道如何将这些行作为块读取。
谁能帮我弄清楚如何以原子方式读取块,以便我可以将它们解析为 JSON,一次一个事件?
解决方案
必须使用 Json处理器来解析到达的 Json 流:
private void listenForEvents(final IMakeHttpRequests httpClient) {
assert httpClient != null : "httpClient cannot be null";
LOGGER.info("starting to listen for script responses . . .");
final InputStream eventStream;
final JsonParser parser;
try {
HttpResponse<InputStream> httpResponse = httpClient.send(HttpRequest
.newBuilder(
new URI(this.config.getEnvironmentAccess().getUrl() + ":<port>/status/?pretty=true"))
.GET().build(), BodyHandlers.ofInputStream());
eventStream = httpResponse.body();
} catch (IOException | InterruptedException | URISyntaxException e) {
throw new RuntimeException("Unable to get status event stream", e);
}
try {
parser = JsonUtils.getObjectMapper().getFactory().createParser(eventStream);
while (parser.nextToken() != null) {
final TreeNode tree = parser.readValueAsTree();
LOGGER.info("tree: {}", tree);
}
} catch (IOException e) {
throw new RuntimeException("Unable to parse event stream to Json", e);
}
}
2018-10-16 13:25:32,081-0400 12622 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:352 - starting to listen for script responses . . .
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:363 - Thing event stream HttpResponse received
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:364 - statusCode: 200
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:365 - headers: java.net.http.HttpHeaders@67b87c21 { {content-type=[application/json], date=[Tue, 16 Oct 2018 17:25:33 GMT], server=[Jetty(9.2.z-SNAPSHOT)], transfer-encoding=[chunked]} }
2018-10-16 13:25:32,809-0400 13350 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:366 - version: HTTP_1_1
2018-10-16 13:25:32,809-0400 13350 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:367 - request: http://<server>.company.com:<port>/status/?pretty=true GET
2018-10-16 13:25:32,819-0400 13360 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Header":{"name":"header","value":{"version":1.2}}}
2018-10-16 13:25:32,835-0400 13376 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #1 of 10
2018-10-16 13:25:32,843-0400 13384 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:19.054Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0","service":"dataManagerService.getData","requestNumber":177}
2018-10-16 13:25:33,074-0400 13615 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #2 of 10
2018-10-16 13:25:33,075-0400 13616 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:23.157Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0ddeebddaa854c6ab2ac33b911af28aa","service":"dataManagerService.getData","requestNumber":180}
2018-10-16 13:25:33,117-0400 13658 [main] INFO c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #3 of 10
2018-10-16 13:25:33,117-0400 13658 [main] INFO c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:27.362Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0b8fa91c6b20478589d66f90af80f481","service":"dataManagerService.getData","requestNumber":182}
2018-10-16 13:25:33,237-0400 13778 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Group":{"name":"groupStart","value":{"groupId":"b205e673-2eb1-4352-b207-fd4917be292a","requestInfo":[{"sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0"}]}}}
2018-10-16 13:25:33,238-0400 13779 [pool-1-thread-1] INFO c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Event":{"name":"jobStart","value":{"stats":{"totalTasks":1,"runningTasks":0,"completeTasks":0,"failedTasks":0},"coreMsUsed":-1,"requestInfo":[{"sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0"}]}}}
. . .
您可以使用 StringBuilder 来获得您想要的结果。
try {
StringBuilder sb = new StringBuilder();
while ((line = br.readLine()) != null) {
sb.append(line);
}
LOGGER.info("readLine(): {}", sb.toString());
} catch (IOException e) {
throw new RuntimeException("Unable to read status event stream", e);
}
似乎您正在逐行读取流,而不管块是如何传入的(BufferedReader 负责将数据重新构建为行)。但是,没有人保证单个块包含单个 json 记录。服务器可以根据其内部缓冲区对数据进行分块。
解析流时,您可以看看 Jackson Streaming API。编程模型有点复杂,但它可能适合您的需要。例子 https://www.baeldung.com/jackson-streaming-api or the default jre implementation https://docs.oracle.com/javaee/7/api/javax/json/stream/package-summary.html