卡夫卡流 - 如何分组两次?

Kafka stream - how to group by twice?

我想创建一个条形图来显示图像中有多少像素颜色;图像每 3 秒更新一次,因此我的条形图也会更新。

我有一个主题收集 JSON 个对象,这些对象的键是图像创建日期,值是十六进制值(例如#FFF)。

我想按键分组,所以它按图像分组,然后按每个组的十六进制值分组并执行 .count()。

你是怎么做到的?

我在想 streams.groupByKey()...然后按十六进制值分组,但我需要将 KTable 转换为 KStream...

更新

抱歉,我在 phone 上打字时没有解释清楚。 我会再试着解释一下。

顺便说一句,我改变了一些东西。这是我的 github 如果您想阅读我在做什么:https://github.com/Lilmortal.

但它很长,我怀疑你会读它哈哈。

不管怎样,我阅读一个主题时,都会收到这样的消息 {imagePath: {hexCode: #fff}}。 图片路径是key,hexCode是value。我可以有一对多的 imagePaths,所以我的想法是我的前端将有一个 websocket 来接收它。它将显示一个图像,并且在其顶部有一个条形图,其中包含像素颜色代码的数量。例如有4个#fff、28个#fef等

因此我想按 imagePath 分组,然后我想计算该 imagePath 的每个像素。

例如:

所以这里imagePath1有47个#fff,imagePath2有23个#fff。

这就是我在 atm 上尝试做的事情。

也许 select 在分组前通过复合键?像这样:

SteamsBuilder topology = new StreamsBuilder();

topology.stream("input")
   .selectKey((k, v) -> k + v.hex)
   .groupByKey()
   .count()

这不是 groupBy 两次,而是让你得到想要的效果。


更新 评论后:

class Image {
    public String imagePath;
}

class ImageAggregation {
    public String imagePath;
    public int count;
}

class ImageSerde implements Serde<Image> {
    // implement
}

class ImageAggregationSerde implements Serde<ImageAggregation> {
    // implement   
}

KTable<String, ImageAggregation> table = topology
  .stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
  .groupBy((k, v) -> v.imagePath)
  .aggregate(ImageAggregation::new,
             (k, v, agg) -> {
                 agg.imagePath = v.imagePath;
                 agg.count = agg.count + 1;
                 return agg;
             }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());

更新 2 post 更新后:

class ImageHex {
    public String imagePath;
    public String hex;
}

class ImageHexAggregation {
    public String imagePath;
    public Map<String, Integer> counts;
}

class ImageHexSerde implements Serde<ImageHex> {
    // implement
}

class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
    // implement   
}

KTable<String, ImageHexAggregation> table = topology
  .stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
  .groupBy((k, v) -> v.imagePath)
  .aggregate(ImageHexAggregation::new,
             (k, v, agg) -> {
                 agg.imagePath = v.imagePath;
                 Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
                 agg.counts.put(v.hex, currentCount + 1));
                 return agg;
             }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());