如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?

How to effectively chain groupby queries from flat api data in Kafka Streams?

我有一些来自 API 的随机数据进入 Kafka 主题,如下所示:

{"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "312.28", "city": "New York City", "state": "New York", "zipCode": "10014"}
{"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"}
{"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"}
{"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"}

我能够创建 KStream 个对象并观察它们,就像这样:

KStream<byte[], UsedCars> usedCarsInputStream = 
            builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));

            //k, v => year, countof cars in year
            KTable<String,Long> yearCount = usedCarsInputStream
                .filter((k,v)->v.getYear() > 2010)
                .selectKey((k,v) -> v.getVin())
                .groupBy((key, value) -> Integer.toString(value.getYear()))
                .count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah")); 

这当然为我们提供了按大于 2010 年的每一年分组的记录的计数。但是,我想在下一步中做但无法完成的是简单地获取这些记录中的每一个年,如 foreach,并计算每年每种颜色的汽车数量。我试图在 yearCount.toStream() 上写一个 foreach 来进一步处理数据,但没有得到任何结果。

我正在寻找可能如下所示的输出:

{
  "2011": [
    {
      "blue": "99",
      "green": "243,",
      "red": "33"
    }
  ],
  "2012": [
    {
      "blue": "74,",
      "green": "432,",
      "red": "2"
    }
  ]
}

我想我可能已经回答了我自己的问题。我欢迎任何其他人对我自己的解决方案发表评论。

我没有意识到的是,您可以对本质上是复合对象的对象进行 GroupBy。在这种情况下,我需要相当于以下 SQL 语句

SELECT   year, color, count(*) FROM use_car_colors AS years 
GROUP BY year, color

在 Kafka Streams 中,您可以通过创建一个对象来完成此操作——在这种情况下,我创建了一个名为 'YearColor' 的 POJO class,其中包含成员年份和颜色——然后 select 作为 Kafka Streams 中的键:

usedCarsInputStream
            .selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
            .groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
            .count()
            .toStream()
            .peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor() 
            + " count: " + ct));

您当然必须为此对象实现序列化器和反序列化器(我用 YearColorSerdes() 实现了)。当 运行 Kafka Streams 应用程序为我提供修改计数的更新时,我的输出如下:

year: 2012 color: Maroon count: 2
year: 2013 color: Khaki count: 1
year: 2012 color: Crimson count: 5
year: 2011 color: Pink count: 4
year: 2011 color: Green count: 2

这正是我要找的。