Kafka Stream - 按 client_id 过滤

Kafka Stream - Filter by client_id

我正在使用 Kafka Stream 创建一个仅包含特定于 client_id 的数据的 ktable,这不是主题键。我是 Kafka Streams 的新手,它看起来非常简单,但我对社区中可用的多个示例感到有点困惑,这些示例非常好。

我正在尝试获取具有 client_id=0123456 的 inputTopic 数据。在下面的 KSQL 中类似于命令:

CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;

下面我正在尝试重现相同的行为。有人可以告诉我在下面做错了什么吗?它没有像我预期的那样过滤。

        final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
        final KTable<String, String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
        stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
        convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));

v 是消息的全部值。在KSQL中要有命名字段,流上有一个关联的模式,例如数据是JSON还是Avro,这意味着clientid只是值

的一部分