在 Kafka Streams 的映射操作中指定 serdes
Specifying serdes on a map operation in Kafka Streams
如果这有任何重复,我很抱歉post,但我一直在寻找答案,但到目前为止一无所获。
我需要的是在更改 Kafka Streams 中的密钥类型的映射操作期间指定 serdes。
原始 KStream 具有字符串类型的键和 avro (GenericRecord) 值,但我需要将其重新映射为 avro 键和值。沿着这些线的东西:
KStream<String, GenericRecord> inputStream = builder.stream("someTopic");
KStream<GenericRecord, GenericRecord> rekeyedStream = inputStream.map((key, value) -> {
GenericRecord newKey = new GenericData.Record(someSchema);
...
return new KeyValue(newKey, newValue);
});
我认为我需要指定 serde,因为类型正在更改,但我发现无法在地图运算符上执行此操作。从主题读取、分组或回写主题时,我们通常可以做类似下面的事情来覆盖默认的 serdes:
KStream<GenericRecord, GenericRecord> stream = builder.stream("someTopic",
Consumed.with(keySerde, valueSerde));
KGroupedStream<GenericRecord, GenericRecord> groupedStream =
inputStream.groupBy((key, value)->somethingThatChangesTheKey(),
Grouped.with(newKeySerde, newValueSerde));
inputStream.to("someTopic", Produced.with(keySerde,valueSerde));
然而我完全不知道如何在类型改变时在地图中指定 serdes,在这种特殊情况下我不能使用我应用的默认 serdes。
我最接近找到解决方案的是 post 正确 ,但我担心接受的响应告诉 OP 他需要指定 serdes,而不是 如何这可以在地图中完成(至少据我所知,我可能会弄错)。
如有任何见解,我们将不胜感激。
您不能在 map()
上指定新的 serde,因为 map()
不需要 serde。 map()
运算符本身获取输入对象并生成输出对象,但它从不序列化或反序列化任何消息。
只有读取或写入 Kafka 主题的运算符允许您设置 serde,因为只有那些运算符才会使用 serde。
我不清楚你试图通过在 map()
运算符上设置 serde 来完成什么。能详细点吗?
如果这有任何重复,我很抱歉post,但我一直在寻找答案,但到目前为止一无所获。
我需要的是在更改 Kafka Streams 中的密钥类型的映射操作期间指定 serdes。 原始 KStream 具有字符串类型的键和 avro (GenericRecord) 值,但我需要将其重新映射为 avro 键和值。沿着这些线的东西:
KStream<String, GenericRecord> inputStream = builder.stream("someTopic");
KStream<GenericRecord, GenericRecord> rekeyedStream = inputStream.map((key, value) -> {
GenericRecord newKey = new GenericData.Record(someSchema);
...
return new KeyValue(newKey, newValue);
});
我认为我需要指定 serde,因为类型正在更改,但我发现无法在地图运算符上执行此操作。从主题读取、分组或回写主题时,我们通常可以做类似下面的事情来覆盖默认的 serdes:
KStream<GenericRecord, GenericRecord> stream = builder.stream("someTopic",
Consumed.with(keySerde, valueSerde));
KGroupedStream<GenericRecord, GenericRecord> groupedStream =
inputStream.groupBy((key, value)->somethingThatChangesTheKey(),
Grouped.with(newKeySerde, newValueSerde));
inputStream.to("someTopic", Produced.with(keySerde,valueSerde));
然而我完全不知道如何在类型改变时在地图中指定 serdes,在这种特殊情况下我不能使用我应用的默认 serdes。
我最接近找到解决方案的是 post 正确
如有任何见解,我们将不胜感激。
您不能在 map()
上指定新的 serde,因为 map()
不需要 serde。 map()
运算符本身获取输入对象并生成输出对象,但它从不序列化或反序列化任何消息。
只有读取或写入 Kafka 主题的运算符允许您设置 serde,因为只有那些运算符才会使用 serde。
我不清楚你试图通过在 map()
运算符上设置 serde 来完成什么。能详细点吗?