如何创建一个指定 json 序列化器和商店名称及其物化定义的 KTable

How to create a KTable specifying a json serializer and a store name with its Materialized definition

我想创建一个具有关联状态存储的 KTable,可以使用交互式查询进行查询,例如:

val builder = StreamsBuilder()
        builder.table(CUSTOMERS_TOPIC, Materialized.`as`<String, Customer, KeyValueStore<Bytes, ByteArray>>(CUSTOMERS_STORE))

然而,为了序列化我的客户值 class,我需要指定一个 Json 序列化程序。我可以使用 StreamsBuilder 中的这个方法来做到这一点:

public synchronized <K, V> KTable<K, V> table(final String topic,
                                                  final Consumed<K, V> consumed) {

消耗了这个:

Consumed.with(Serdes.String(), Serdes.serdeFrom(JsonPojoSerializer<Customer>(), JsonPojoDeserializer(Customer::class.java)

但是如您所见,无法设置商店名称。此外,根据 javadoc,内部状态存储可能不可查询:

with an internal store name. Note that store name may not be queriable through Interactive Queries

那么如何使用指定名称配置实体化并指示必要的 Json 序列化程序?

在 Scala API 中,Serdes 是通过隐式解析的。这就是为什么没有重载传递 Consumed 参数的原因。比照。 https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala#L88-L129

对于Java,有一个方法重载允许你传入两个参数:

public synchronized <K, V> KTable<K, V> table(final String topic,
                                              final Consumed<K, V> consumed,
                                              final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {

比照。 https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-