在 Spring Boot 中实现 KStream / Table 的困惑

Confusion on implementing KStream / Table in Spring Boot

我正在尝试获取一些 Spring-Boot kafka 流 "action" 工作的样本,但我似乎完全搞糊涂了:)

我正在通过网络接收 JSON 数据。我在 avro 中构建了一个模式,用于序列化数据:


{
  "UID": "XJ3_112",
  "type": "11X",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 0,
  "latitude": 50.1232,
  "longitude": -119.257,
  "altitude": 0,
  "time": "2018-07-18T00:00:13.9966Z"
}

{
  "platformUID": "BSG_SS_1_4",
  "type": "OB_334_11",
  "state": "ON_STATION",
  "fuelremaining": -1,
  "latitude": 56.1623,
  "longitude": -44.5614,
  "altitude": 519174,
  "time": "2018-07-18T00:01:43.0871Z"
}

据我所知:

@Component
class KStreamTransformer {

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @StreamListener(MyKafkaStreams.INPUT)
    @SendTo(MyKafkaStreams.OUTPUT)
    fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {

        return input.flatMapValues{
            value ->
            val out = Arrays.asList(value)

            out
        }.groupBy() ???
    }
}

我希望创建一个如下所示的 KTable:

|platformUID|state|Lat|Lon|Alt| |------------|-----|---|---|---|

这就是我感到困惑的地方。

我假设我想在 PlatformUID 字段上做一个 GroupBy,但我不清楚如何实际进行。

有人能给我指出正确的方向吗?

我想我正在寻找的是获取 input 流并将其转换为 KTable,键为 value.getUID(),值为之前的值 [=18] =]

如果platformUID已经是数据生产者使用的key,可以使用

KTable ktable = kstream
                 .groupByKey()
                 .reduce((oldValue, newValue) -> newValue)

如果没有,应该在.groupBy()中放一个KeyValueMapper,看起来像

KTable ktable = kstream
                 .groupBy((k, v) -> v.getPlatformUID())
                 .reduce((oldValue, newValue) -> newValue)

它将创建一个内部主题,使用新密钥对源主题重新分区。

对于java7,KeyValueMapper的语法如下:

KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
        public K apply(K key, V1 value) {
            return key;
        }
    };