如何在 Spring Kafka 中接收来自 KSQL 的流式响应?

How to receive streamed response from KSQL in Spring Kafka?

如何在 java spring 启动应用程序中接收来自 kafka KSQL 服务器的分块响应?

当我对 /query 端点进行休息调用时,我只得到 1 行并且连接关闭。如何保持连接打开并接收多行?

文档说

The response is streamed back until the LIMIT specified in the statement is reached, or the client closes the connection.

在java中实现这个的方法是什么?即使对于 KTable,我在 return.

中也只有 1 行

https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output

我能够解决的方法如下:

  • 获取字符串形式的响应
  • 逐行解析JSON个对象(KafkaQueryResponse是一个代表1行的对象)

        ResponseEntity<String> result = template.exchange("/query",
            HttpMethod.POST,
            new HttpEntity<>(params, headers),
            String.class);
    
        List<KafkaQueryResponse> array = new ArrayList<>();
        JsonFactory jsonFactory = new JsonFactory();
        try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
            Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
            value.forEachRemaining(e -> {
                if (e.getRow() != null) {
                    array.add(e);
                }
            });
        }
        array <----  this is the list of JSON objects
    

KafkaQueryResponse

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class KafkaQueryResponse {
        private KafkaQueryRow row;
        private String finalMessage;
        private String errorMessage;

        @Data
        @JsonIgnoreProperties(ignoreUnknown = true)
        public static class KafkaQueryRow {
            private List<Object> columns;
        }
    }

此解决方案不允许读取块中的流式响应。它等待整个响应到达客户端,然后关闭连接,然后解析所有 json 个对象。