如何流式传输到全局 Kafka Table

How to Stream to a Global Kafka Table

我有一个 Kafka Streams 应用程序,它需要根据全局 table 加入传入流,然后在进行一些处理后,将聚合结果写回 table:

KeyValueBytesStoreSupplier supplier = Stores.persistentKeyValueStore(
    storeName
);

Materialized<String, String, KeyValueStore<Bytes, byte[]>> m = Materialized.as(
    supplier
);

GlobalKTable<String, String> table = builder.globalTable(
    topic, m.withKeySerde(
        Serdes.String()
    ).withValueSerde(
        Serdes.String()
    )
);

stream.leftJoin(
    table
    ...
).groupByKey().aggregate(
    ... 
).toStream().through(
    topic, Produced.with(Serdes.String(), Serdes.String())
);

但是,当我尝试流入 KTable 更新日志时,出现以下错误:Invalid topology: Topic 'topic' has already been registered by another source.

如果我尝试聚合到商店本身,我会收到以下错误:InvalidStateStoreException: Store 'store' is currently closed

如何加入 table 并写回其变更日志?

如果这不可能,涉及针对存储过滤传入日志的解决方案也可行。

调用through()

的捷径
stream.to("topic");
KStream stream2 = builder.stream("topic");

因为您已经使用了 builder.stream("topic"),所以您会得到 Invalid topology: Topic 'topic' has already been registered by another source.,因为每个主题只能使用一次。如果你想将 stream/topic 的数据馈送到不同的部分,你需要重新使用你为这个主题创建的原始 KStream

KStream 流 = builder.stream("topic");

// this won't work
KStream stream2 = stream.through("topic");

// rewrite to
stream.to("topic");
KStream stream2 = stream; // or just omit `stream2` and reuse `stream`

不确定你的意思

If I try to aggregate to the store itself