当未指定默认 serdes 并使用自定义 serdes 时,KStream 上的映射操作失败 -> org.apache.kafka.streams.errors.StreamsException

Map operation over a KStream fails when not specifying default serdes and using custom ones -> org.apache.kafka.streams.errors.StreamsException

因为我使用的是 Json 值,所以我没有设置默认的 serdes。

我处理了一个 KStream,使用必要的 spring 和产品 (json) serdes,但下一步(映射操作)失败了:

val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers

val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))

builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
            .map { key, value ->
                KeyValue(key, XXX)
            }
            .aggregate(...)

如果我删除地图操作,执行就会正常。

我还没有找到为 map() 指定 serdes 的方法,怎么办?

错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)

多个问题:

  1. 调用 map() 后调用 groupByKey().aggregate()。这会触发数据重新分区,因此在 map() 数据被写入内部主题以进行数据重新分区之后。因此,您也需要在groupByKey()内提供相应的Serde

  2. 但是,因为您没有修改密钥,所以您实际上应该调用 mapValues(),以避免不必要的重新分区。

  3. 请注意,您需要为不应使用配置中的默认 Serde 的每个运算符提供 Serdes。 Serdes 不会沿下游传递,而是运算符就地覆盖。 (Kafka 2.1 正在改进这一点。)

如果有人像我一样登陆这里,这里有一个演示 Matthias 解决方案的实际示例。由于 ClassCastException,以下代码将失败。

builder.stream(
        "my-topic",
        Consumed.with(Serdes.String(), customSerde))
        .map((k, v) -> {
            // You've modified the key. Get ready for an error!
            String newKey = k.split(":")[0];
            return KeyValue.pair(newKey.toString(), v);
        })
        .groupByKey()
        .aggregate(
            MyAggregate::new,
            (key, data, aggregation) -> {
                // You'll never reach this block due to an error similar to:
                // ClassCastException while producing data to topic
                return aggregation.updateFrom(shot);
            },
            Materialized.<String, MyAggregate> as(storeSupplier)
                .withKeySerde(Serdes.String())
                .withValueSerde(aggregateSerde)
        )

正如 Matthias 提到的,“您需要在 groupByKey 中提供相应的 Serdes”。方法如下:

builder.stream(
        "my-topic",
        Consumed.with(Serdes.String(), customSerde))
        .map((k, v) -> {
            // You've modified the key. Get ready for an error!
            String newKey = k.split(":")[0];
            return KeyValue.pair(newKey.toString(), v);
        })
        // Voila, you've specified the serdes required by the new internal
        // repartition topic and you can carry on with your work
        .groupByKey(Grouped.with(Serdes.String(), customSerde))
        .aggregate(
            MyAggregate::new,
            (key, data, aggregation) -> {
                // You'll never reach this block due to an error similar to:
                // ClassCastException while producing data to topic
                return aggregation.updateFrom(shot);
            },
            Materialized.<String, MyAggregate> as(storeSupplier)
                .withKeySerde(Serdes.String())
                .withValueSerde(aggregateSerde)
        )