如何流式传输到全局 Kafka Table
How to Stream to a Global Kafka Table
我有一个 Kafka Streams 应用程序,它需要根据全局 table 加入传入流,然后在进行一些处理后,将聚合结果写回 table:
KeyValueBytesStoreSupplier supplier = Stores.persistentKeyValueStore(
storeName
);
Materialized<String, String, KeyValueStore<Bytes, byte[]>> m = Materialized.as(
supplier
);
GlobalKTable<String, String> table = builder.globalTable(
topic, m.withKeySerde(
Serdes.String()
).withValueSerde(
Serdes.String()
)
);
stream.leftJoin(
table
...
).groupByKey().aggregate(
...
).toStream().through(
topic, Produced.with(Serdes.String(), Serdes.String())
);
但是,当我尝试流入 KTable 更新日志时,出现以下错误:Invalid topology: Topic 'topic' has already been registered by another source.
如果我尝试聚合到商店本身,我会收到以下错误:InvalidStateStoreException: Store 'store' is currently closed
。
如何加入 table 并写回其变更日志?
如果这不可能,涉及针对存储过滤传入日志的解决方案也可行。
调用through()
是
的捷径
stream.to("topic");
KStream stream2 = builder.stream("topic");
因为您已经使用了 builder.stream("topic")
,所以您会得到 Invalid topology: Topic 'topic' has already been registered by another source.
,因为每个主题只能使用一次。如果你想将 stream/topic 的数据馈送到不同的部分,你需要重新使用你为这个主题创建的原始 KStream
:
KStream 流 = builder.stream("topic");
// this won't work
KStream stream2 = stream.through("topic");
// rewrite to
stream.to("topic");
KStream stream2 = stream; // or just omit `stream2` and reuse `stream`
不确定你的意思
If I try to aggregate to the store itself
我有一个 Kafka Streams 应用程序,它需要根据全局 table 加入传入流,然后在进行一些处理后,将聚合结果写回 table:
KeyValueBytesStoreSupplier supplier = Stores.persistentKeyValueStore(
storeName
);
Materialized<String, String, KeyValueStore<Bytes, byte[]>> m = Materialized.as(
supplier
);
GlobalKTable<String, String> table = builder.globalTable(
topic, m.withKeySerde(
Serdes.String()
).withValueSerde(
Serdes.String()
)
);
stream.leftJoin(
table
...
).groupByKey().aggregate(
...
).toStream().through(
topic, Produced.with(Serdes.String(), Serdes.String())
);
但是,当我尝试流入 KTable 更新日志时,出现以下错误:Invalid topology: Topic 'topic' has already been registered by another source.
如果我尝试聚合到商店本身,我会收到以下错误:InvalidStateStoreException: Store 'store' is currently closed
。
如何加入 table 并写回其变更日志?
如果这不可能,涉及针对存储过滤传入日志的解决方案也可行。
调用through()
是
stream.to("topic");
KStream stream2 = builder.stream("topic");
因为您已经使用了 builder.stream("topic")
,所以您会得到 Invalid topology: Topic 'topic' has already been registered by another source.
,因为每个主题只能使用一次。如果你想将 stream/topic 的数据馈送到不同的部分,你需要重新使用你为这个主题创建的原始 KStream
:
KStream 流 = builder.stream("topic");
// this won't work
KStream stream2 = stream.through("topic");
// rewrite to
stream.to("topic");
KStream stream2 = stream; // or just omit `stream2` and reuse `stream`
不确定你的意思
If I try to aggregate to the store itself