如何让推送查询连续/一次流式传输超过十分钟?

How to allow push queries to be streamed continuously/ for more than ten minutes at a time?

我正在为 ksqlDB 开发一个 java 消费者,它使用 PULL 和 PUSH 查询。

目前,单个推送查询一次最多可以流式传输十分钟,之后如果消费者空闲,服务器将关闭连接。即使流或表发生变化,由于连接超时,我们也无法接收到这些变化。 有什么方法可以让这个连接一直保持活动状态吗?

一种解决方法是每隔几分钟 ping ksqlDB 服务器一次。 但它显然会不必要地占用服务器的资源。 https://github.com/confluentinc/ksql/issues/6970

否则可以使用 Reactive Streams Subscriber 编写简单的 onError 方法(有关更多详细信息,请参阅 https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/),其中可以编写 re-subscription 逻辑。 因此,如果 ksqlDB 服务器在几个小时后发生下一次更改,超时异常将被 onError 方法捕获,然后我们可以 re-subscribe 推送查询。

@Override
    public synchronized void onError(Throwable t) {
        logger.error("Received an error in onError method : " + t);
        getPushQueryResult();
    }

public void getPushQueryResult() {
        try {
            client.streamQuery("query").thenAccept(streamedQueryResult -> {
                logger.info("Query has started. Query ID: " + streamedQueryResult.queryID());
                RowSubscriber subscriber = new RowSubscriber();
                streamedQueryResult.subscribe(subscriber);
            }).exceptionally(e -> {
                logger.error("Request failed : " + e);
                return null;
            });
        } catch (Exception e) {
            logger.error("Exception in getPushQueryResult method : " + e);
        }

    }