KSQL如何在更新Stream时继续消费数据?

How KSQL continue consume data when update Stream?

我的 KSQL 问题是在更新流(终止查询并删除流,创建新流)并将数据发布到 'MainTopic'.

时数据丢失

我的 KSQL 架构是:

MAIN_STREAM ----> CONDITION_STREAM

我在终止查询和丢弃流期间丢失了数据,直到创建新的 CONDITION_STREAM。

可以建议在 'MainTopic' 中发布数据时更新新 CONDITION_STREAM 的方法,并在终止查询和丢弃流时继续使用数据。

原谅我的英语水平。 谢谢

我尝试对 CONDITION_STREAM 使用 'auto.offset.reset'='earliest' 但这会消耗 MAIN_STREAM.

MainTopic 中的所有数据

第 1 步:从主要主题和条件流创建 MAIN_STREAM 输出主题

CREATE STREAM MAIN_STREAM (event_id VARCHAR , payload VARCHAR) WITH (KAFKA_TOPIC='MainTopic', VALUE_FORMAT='json');

第 2 步:从 MAIN_STREAM

创建 CONDITION_STREAM 过滤器数据
CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from MAIN_STREAM WHERE  event = "payment";

第 3 步:终止 CONDITION_STREAM

的查询 ID
TERMINATE <CONDITION_STREAM_QUERY_ID>;

第 4 步:创建新的 CONDITION_STREAM

DROP STREAM CONDITION_STREAM;

第 5 步:创建新的 CONDITION_STREAM

CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from main_stream WHERE  event = "something " AND EXTRACTJSONFIELD(payload, '$.status') = 'init';

重新创建 CONDITION_STREAM 并让它从它停止的地方开始处理它的 main_stream 输入目前是不可能的,(从 ksqlDB v0.10 开始)。

然而,它是一个受到很多思考和关注的领域。目前有一项设计提案正在审查中,以采取允许现有流和 table 视图就地更新的第一步。有关详细信息,请参阅 https://github.com/confluentinc/ksql/pull/5611