Spring Cloud Stream Kafka Streams 入站 KTable 可预测的内部状态存储主题名称
Spring Cloud Stream Kafka Streams inbound KTable predictable internal state-store topic names
我们将 Kafka Streams 与 Spring Cloud Stream Functions 结合使用。我们有一个典型的示例应用程序,它将用户点击 kstream
与用户区域 ktable
.
连接起来
我们知道我们可以通过在定义拓扑时使用接受物化存储名称的适当方法来强制内部更改日志或重新分区主题的自定义名称:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> bifunctionktable() {
return (userClicksStream, userRegionsTable) -> userClicksStream
.leftJoin(userRegionsTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null, "bifunctionktable-leftjoin"))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()).withName("bifunctionktable-groupbykey"))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks, Materialized.as("bifunctionktable-reduce"))
.toStream();
}
但是对于输入的 KTable,我们无法更改其状态存储内部主题名称,我们总是得到这个主题名称:myapp-id-user-regions-STATE-STORE-0000000001-changelog
如果我们完全通过代码创建拓扑,我们确实有 builder.table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
方法,但是使用函数...在这种情况下,有什么方法可以自定义输入 KTable 的内部主题名称吗?
您可以使用以下 属性 为传入的 KTable
添加自定义名称:
spring.cloud.stream.kafka.streams.bindings.bifunctionktable-in-1.consumer.materializedAs: <Your-custom-store-name>
这在参考文档的 this section 中有记载。
我们将 Kafka Streams 与 Spring Cloud Stream Functions 结合使用。我们有一个典型的示例应用程序,它将用户点击 kstream
与用户区域 ktable
.
我们知道我们可以通过在定义拓扑时使用接受物化存储名称的适当方法来强制内部更改日志或重新分区主题的自定义名称:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> bifunctionktable() {
return (userClicksStream, userRegionsTable) -> userClicksStream
.leftJoin(userRegionsTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null, "bifunctionktable-leftjoin"))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()).withName("bifunctionktable-groupbykey"))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks, Materialized.as("bifunctionktable-reduce"))
.toStream();
}
但是对于输入的 KTable,我们无法更改其状态存储内部主题名称,我们总是得到这个主题名称:myapp-id-user-regions-STATE-STORE-0000000001-changelog
如果我们完全通过代码创建拓扑,我们确实有 builder.table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
方法,但是使用函数...在这种情况下,有什么方法可以自定义输入 KTable 的内部主题名称吗?
您可以使用以下 属性 为传入的 KTable
添加自定义名称:
spring.cloud.stream.kafka.streams.bindings.bifunctionktable-in-1.consumer.materializedAs: <Your-custom-store-name>
这在参考文档的 this section 中有记载。