KTable-KTable FK join: can't select foreign key serde

KTable-KTable FK join: can't select foreign key serde

总结

我正在尝试进行 KTable-KTable 外键连接,但出现错误,因为 Kafka Streams 是 尝试使用 String serde 作为外键。

我希望它使用 Kotlinx 序列化 serde。我该如何指定?

详情

我想将两个 KTable 的数据连接在一起,使用 FK 选择器并将值重新映射到 聚合对象。

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes KTable
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

但是,我得到一个错误,因为 Kafka Streams 正在使用 Serdes.String()(我的默认 Serde) 用于反序列化外键。但它是一个 JSON 对象,我希望它使用 Kotlinx 序列化。

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in 
StreamConfig or provide correct Serdes via method 
parameters. Make sure the Processor can accept the 
deserialized input of type key: myproject.MyTopology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.

Note that although incorrect Serdes are a common cause 
of error, the cast exception might have another cause 
(in user code, for example). For example, if a 
processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised 
during processing, but the cause would not be wrong Serdes.

背景

我正在使用的数据来自电脑游戏。游戏有一张地图,称为表面。每个 表面由表面索引唯一标识。每个表面都有瓷砖,在 x/y 平面上。这 tiles 有一个 'prototype name',这是 TilePrototype 的 ID。每个 TilePrototype 有 有关该磁贴的功能或外观的信息。我需要它的颜色。

拓扑

按区块分组图块

首先,我将磁贴分组为 32x32 的块,然后将它们分组为 KTable。

/** Each chunk is identified by the surface, and an x/y coordinate */
@Serializable
data class MapChunkDataPosition(
  val position: MapChunkPosition,
  val surfaceIndex: SurfaceIndex,
)

/** Each chunk has 32 tiles */
@Serializable
data class MapChunkData(
  val chunkPosition: MapChunkDataPosition,
  val tiles: Set<MapTile>,
)

// get all incoming tiles and group them by chunk,
// this works successfully
val tilesGroupedByChunk: KTable<MapChunkDataPosition, MapChunkData> =
  buildChunkedTilesTable(tilesTable)
按表面索引对原型进行分组

然后我按表面索引收集所有原型,并将它们聚合成一个列表


/** Identifier for a surface (a simple wrapper, so I can use a Kotlinx Serialization serde everywhere)*/
@Serializable
data class SurfaceIndex(
  val surfaceIndex: Int
)

/** Each surface has some 'prototypes' - I want this because each tile has a colour */
@Serializable
data class SurfacePrototypesData(
  val surfaceIndex: SurfaceIndex,
  val mapTilePrototypes: Set<MapTilePrototype>,
)

// get all incoming prototypes and group them by surface index,
// this works successfully
val tilePrototypesTable: KTable<SurfaceIndex, SurfacePrototypesData> =
  tilePrototypesTable()
ktable-ktable fk join

这是导致错误的代码

/** For each chunk, get all tiles in that chunk, and all prototypes */
@Serializable
data class ChunkTilesAndProtos(
  val chunkTiles: MapChunkData,
  val protos: SurfacePrototypesData
)

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

完整的堆栈跟踪

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: MyProject.processor.Topology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:131)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:105)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:186)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:54)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:29)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.apply(MeteredKeyValueStore.java:182)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.apply(MeteredKeyValueStore.java:179)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal[=16=](CachingKeyValueStore.java:87)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:136)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flushCache(CachingKeyValueStore.java:345)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:487)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1043)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1016)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1017)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.lang.ClassCastException: class MyProjectTopology$MapChunkData cannot be cast to class java.lang.String (MyProject.processor.MyProject$MapChunkData is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:99)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:69)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
... 30 common frames omitted

版本

有点出乎意料,我在拓扑定义中犯了一个错误。

在创建其中一个表的最后阶段,我映射了值 - 但我没有指定 serdes。

    .mapValues { _, v ->
      ChunkTilesAndProtos(v.tiles, v.protos)
    }

所以我将其更改为指定 serdes。

    .mapValues(
      "finalise-web-map-tile-chunk-aggregation",
      materializedAs("web-map-tile-chunks", jsonMapper.serde(), jsonMapper.serde())
    ) { _, v ->
      ChunkTilesAndProtos(v.tiles, v.protos)
    }
// note: this uses extension functions from github.com/adamko-dev/kotka-streams 

找到这个并不容易。我通过在 AbstractStream.java (以及其他构造函数)的构造函数中放置断点来查看 keySerdevalueSerde 字段何时 not 找到了它设置。

有时会出现空 serde(我认为一些 KTables/KStreams 是 'virtual' 而不是 encode/decode to/from Kafka 主题)。但是我能够找到导致我的问题的操作,并定义 serdes 因为我正在更改值类型。