如何在 KSQL 中 select/assign 分区

How to select/assign partition in KSQL

我认为这是一个相关问题: Using Kafka KSQL to select all events of a topic from a specific partition with given offset

如何通过 KSQL select/assign 分区?我试图阻止 KSQL 从所有分区读取,因为必要的数据只存在于一个分片中。

例如:

CLI v5.4.1,服务器 v5.4.1

SET 'auto.offset.reset'='earliest';
CREATE STREAM SOURCE_STREAM (FIELD_1 BIGINT)
    WITH (
        VALUE_FORMAT='AVRO',
        KAFKA_TOPIC='source_topic',
        PARTITIONS=2,
        REPLICAS=1
    );

插入一些位于分区 0 和分区 1 中的模拟数据(没有真正分配,但例如)

INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (123);  # say in partition 0
INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (456);  # say in partition 1

对于消费者 API 可以执行以下操作:

consumer.assign(TopicPartition(topic=source_topic, partition=0))
consumer.assign(TopicPartition(topic=source_topic, partition=1))
consumer.get()

但是,对于当前 API,我不确定如何在客户端级别或服务器 属性 级别 "assign" 分区。以下派生流将从所有分区读取:

CREATE STREAM DERIVATIVE_STREAM AS 
    SELECT
        FIELD_1
    FROM SOURCE_STREAM
    EMIT CHANGES;

EXPLAIN CSAS_DERIVATIVE_STREAM_n;

(我知道我可以使用 WHERE 语句来过滤数据,但我想明确地从分区 0|1 中读取)

ksqlDB 不是这样工作的。您使用 SQL 来声明 what 你想要什么,而不是 how 你想要它。

正如您在问题中所说,您可以使用 WHERE 将谓词应用于您的查询,并可以使用 ROWKEY 来定位消息键值。

我想 RDBMS 世界中的并行将是基于成本的优化器的执行计划提示。

如果您想将此记录为对 ksqlDB 的增强请求,请在此处进行:https://github.com/confluentinc/ksql/issues/new