Kafka为什么要改店名

Why does Kafka change store name

我的申请有问题。

代码:

KTable<Long, byte[]> table = stream.groupByKey().aggregate(() -> null , (key, oldVal, newVal) -> {
        return newVal;
    }, Materialized.<Long,byte[],KeyValueStore<Long,byte[]>>as("networkStore").with(longSerde, byteSerde));

这里我把Store Name设置为networkStore,但是当我列出Kafka主题时,store的名字是network-service-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog

我想要的是:-商店的名称是 networkStore,以便我以后可以从中读取。

当我现在尝试从商店读取时,出现以下异常:

org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, networkStore, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1039) at com.maxflow.networksservice.utils.NetworksServiceUtils.updateGraphForCompany(NetworksServiceUtils.java:41) at com.maxflow.networksservice.consumer.NodesConsumer.run(NodesConsumer.java:99) at java.lang.Thread.run(Thread.java:748)

使用以下内容:

KTable<Long, byte[]> table = stream.groupByKey().aggregate(() -> null , (key, oldVal, newVal) -> {
        return newVal;
    }, Materialized.with(longSerde, byteSerde).as("networkStore"));

Materialized.as().with() 正在用内部名称覆盖自定义名称。因此你 应该在 .with() 之后调用 .as() 方法。您可以在此处阅读更多详细信息。

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Materialized.html#with-org.apache.kafka.common.serialization.Serde-org.apache.kafka.common.serialization.Serde-

另一种选择是使用 .withKeySerde().withValueSerde() 方法以及自定义商店名称,如下所示。

Materialized.<Long,byte[],KeyValueStore<Long,byte[]>>as("networkStore").withKeySerde(longSerde).withValueSerde(byteSerde)