如何让推送查询连续/一次流式传输超过十分钟?
How to allow push queries to be streamed continuously/ for more than ten minutes at a time?
我正在为 ksqlDB 开发一个 java 消费者,它使用 PULL 和 PUSH 查询。
目前,单个推送查询一次最多可以流式传输十分钟,之后如果消费者空闲,服务器将关闭连接。即使流或表发生变化,由于连接超时,我们也无法接收到这些变化。
有什么方法可以让这个连接一直保持活动状态吗?
一种解决方法是每隔几分钟 ping ksqlDB 服务器一次。
但它显然会不必要地占用服务器的资源。
https://github.com/confluentinc/ksql/issues/6970
否则可以使用 Reactive Streams Subscriber 编写简单的 onError 方法(有关更多详细信息,请参阅 https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/),其中可以编写 re-subscription 逻辑。
因此,如果 ksqlDB 服务器在几个小时后发生下一次更改,超时异常将被 onError 方法捕获,然后我们可以 re-subscribe 推送查询。
@Override
public synchronized void onError(Throwable t) {
logger.error("Received an error in onError method : " + t);
getPushQueryResult();
}
public void getPushQueryResult() {
try {
client.streamQuery("query").thenAccept(streamedQueryResult -> {
logger.info("Query has started. Query ID: " + streamedQueryResult.queryID());
RowSubscriber subscriber = new RowSubscriber();
streamedQueryResult.subscribe(subscriber);
}).exceptionally(e -> {
logger.error("Request failed : " + e);
return null;
});
} catch (Exception e) {
logger.error("Exception in getPushQueryResult method : " + e);
}
}
我正在为 ksqlDB 开发一个 java 消费者,它使用 PULL 和 PUSH 查询。
目前,单个推送查询一次最多可以流式传输十分钟,之后如果消费者空闲,服务器将关闭连接。即使流或表发生变化,由于连接超时,我们也无法接收到这些变化。 有什么方法可以让这个连接一直保持活动状态吗?
一种解决方法是每隔几分钟 ping ksqlDB 服务器一次。 但它显然会不必要地占用服务器的资源。 https://github.com/confluentinc/ksql/issues/6970
否则可以使用 Reactive Streams Subscriber 编写简单的 onError 方法(有关更多详细信息,请参阅 https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/),其中可以编写 re-subscription 逻辑。 因此,如果 ksqlDB 服务器在几个小时后发生下一次更改,超时异常将被 onError 方法捕获,然后我们可以 re-subscribe 推送查询。
@Override
public synchronized void onError(Throwable t) {
logger.error("Received an error in onError method : " + t);
getPushQueryResult();
}
public void getPushQueryResult() {
try {
client.streamQuery("query").thenAccept(streamedQueryResult -> {
logger.info("Query has started. Query ID: " + streamedQueryResult.queryID());
RowSubscriber subscriber = new RowSubscriber();
streamedQueryResult.subscribe(subscriber);
}).exceptionally(e -> {
logger.error("Request failed : " + e);
return null;
});
} catch (Exception e) {
logger.error("Exception in getPushQueryResult method : " + e);
}
}