KSQL如何在更新Stream时继续消费数据?
How KSQL continue consume data when update Stream?
我的 KSQL 问题是在更新流(终止查询并删除流,创建新流)并将数据发布到 'MainTopic'.
时数据丢失
我的 KSQL 架构是:
MAIN_STREAM ----> CONDITION_STREAM
我在终止查询和丢弃流期间丢失了数据,直到创建新的 CONDITION_STREAM。
可以建议在 'MainTopic' 中发布数据时更新新 CONDITION_STREAM 的方法,并在终止查询和丢弃流时继续使用数据。
原谅我的英语水平。
谢谢
我尝试对 CONDITION_STREAM 使用 'auto.offset.reset'='earliest' 但这会消耗 MAIN_STREAM.
MainTopic 中的所有数据
第 1 步:从主要主题和条件流创建 MAIN_STREAM 输出主题
CREATE STREAM MAIN_STREAM (event_id VARCHAR , payload VARCHAR) WITH (KAFKA_TOPIC='MainTopic', VALUE_FORMAT='json');
第 2 步:从 MAIN_STREAM
创建 CONDITION_STREAM 过滤器数据
CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from MAIN_STREAM WHERE event = "payment";
第 3 步:终止 CONDITION_STREAM
的查询 ID
TERMINATE <CONDITION_STREAM_QUERY_ID>;
第 4 步:创建新的 CONDITION_STREAM
DROP STREAM CONDITION_STREAM;
第 5 步:创建新的 CONDITION_STREAM
CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from main_stream WHERE event = "something " AND EXTRACTJSONFIELD(payload, '$.status') = 'init';
重新创建 CONDITION_STREAM
并让它从它停止的地方开始处理它的 main_stream
输入目前是不可能的,(从 ksqlDB v0.10 开始)。
然而,它是一个受到很多思考和关注的领域。目前有一项设计提案正在审查中,以采取允许现有流和 table 视图就地更新的第一步。有关详细信息,请参阅 https://github.com/confluentinc/ksql/pull/5611。
我的 KSQL 问题是在更新流(终止查询并删除流,创建新流)并将数据发布到 'MainTopic'.
时数据丢失我的 KSQL 架构是:
MAIN_STREAM ----> CONDITION_STREAM
我在终止查询和丢弃流期间丢失了数据,直到创建新的 CONDITION_STREAM。
可以建议在 'MainTopic' 中发布数据时更新新 CONDITION_STREAM 的方法,并在终止查询和丢弃流时继续使用数据。
原谅我的英语水平。 谢谢
我尝试对 CONDITION_STREAM 使用 'auto.offset.reset'='earliest' 但这会消耗 MAIN_STREAM.
MainTopic 中的所有数据第 1 步:从主要主题和条件流创建 MAIN_STREAM 输出主题
CREATE STREAM MAIN_STREAM (event_id VARCHAR , payload VARCHAR) WITH (KAFKA_TOPIC='MainTopic', VALUE_FORMAT='json');
第 2 步:从 MAIN_STREAM
创建 CONDITION_STREAM 过滤器数据CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from MAIN_STREAM WHERE event = "payment";
第 3 步:终止 CONDITION_STREAM
的查询 IDTERMINATE <CONDITION_STREAM_QUERY_ID>;
第 4 步:创建新的 CONDITION_STREAM
DROP STREAM CONDITION_STREAM;
第 5 步:创建新的 CONDITION_STREAM
CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from main_stream WHERE event = "something " AND EXTRACTJSONFIELD(payload, '$.status') = 'init';
重新创建 CONDITION_STREAM
并让它从它停止的地方开始处理它的 main_stream
输入目前是不可能的,(从 ksqlDB v0.10 开始)。
然而,它是一个受到很多思考和关注的领域。目前有一项设计提案正在审查中,以采取允许现有流和 table 视图就地更新的第一步。有关详细信息,请参阅 https://github.com/confluentinc/ksql/pull/5611。