Kafka Stream:记录和聚合

Kafka Stream: Record & Aggregate

[
    {
        "device_nm": "x1",
        "type": "external",
        "mtrc1": 100,
        "mtrc2": 25,
        "starttime": "2018-05-04 01:00:00",
        "model": "t20"
    },
    {
        "device_nm": "x1",
        "type": "external",
        "mtrc1": 5,
        "mtrc2": 11,
        "starttime": "2018-05-04 02:00:00",
        "model": "t20"
    },
    {
        "device_nm": "x1",
        "type": "internal",
        "mtrc1": 35,
        "mtrc2": 15,
        "starttime": "2018-05-04 01:00:00",
        "model": "t40"
    },
    {
        "device_nm": "x1",
        "type": "internal",
        "mtrc1": 53,
        "mtrc2": 22,
        "starttime": "2018-05-04 02:00:00",
        "model": "t40"
    }
]

假设每条Kafka消息包含一组JSON个对象,想用KStream/KTable执行group by device_nm、type、truncate(starttime)和aggregate mtrc1、mtrc2。输出应如下所示:

[
    {
        "device_nm": "x1",
        "type": "external",
        "mtrc1": 105,
        "mtrc2": 36,
        "date": "2018-05-04",
        "model": "t20"
    },
    {
        "device_nm": "x1",
        "type": "internal",
        "mtrc1": 88,
        "mtrc2": 37,
        "date": "2018-05-04",
        "model": "t40"
    }
]

我们如何使用聚合 API 并保留所有属性?

KafkaStream的groupby,聚合base by Key, 所以你必须用 Multiple Key

制作钥匙
public class MKey {
    String device;
    String type;
}

public class MBody {
    int mtric1;
    int mricc2;
}

KStream<MKey, MBody> stream =
        sb.stream("topic",
                  Consumed.with(new JsonSerde<>(MKey.class), new JsonSerde<>(MBody.class, objectMapper)));
    stream.groupByKey()
          .aggregate(MBody::new,
                     (key, value, aggr) -> {
                         aggr.mricc2 += value.mricc2;
                         aggr.mtric1 += value.mtric1;
                         return aggr;
                     });