卡夫卡流 - 如何分组两次?
Kafka stream - how to group by twice?
我想创建一个条形图来显示图像中有多少像素颜色;图像每 3 秒更新一次,因此我的条形图也会更新。
我有一个主题收集 JSON 个对象,这些对象的键是图像创建日期,值是十六进制值(例如#FFF)。
我想按键分组,所以它按图像分组,然后按每个组的十六进制值分组并执行 .count()。
你是怎么做到的?
我在想 streams.groupByKey()...然后按十六进制值分组,但我需要将 KTable 转换为 KStream...
更新
抱歉,我在 phone 上打字时没有解释清楚。
我会再试着解释一下。
顺便说一句,我改变了一些东西。这是我的 github 如果您想阅读我在做什么:https://github.com/Lilmortal.
- 我的项目 "HexGraph-source-connector" 在
指定目录并将图片路径推送到主题
- "HexGraph"项目捡起来,用Akka,演员们会得到
所有像素十六进制代码分别开始推送消息
转到另一个主题。
- "HexGraph-stream" 是我的 kafka 流部分。
但它很长,我怀疑你会读它哈哈。
不管怎样,我阅读一个主题时,都会收到这样的消息 {imagePath: {hexCode: #fff}}。
图片路径是key,hexCode是value。我可以有一对多的 imagePaths,所以我的想法是我的前端将有一个 websocket 来接收它。它将显示一个图像,并且在其顶部有一个条形图,其中包含像素颜色代码的数量。例如有4个#fff、28个#fef等
因此我想按 imagePath 分组,然后我想计算该 imagePath 的每个像素。
例如:
- {imagePath1: {hexCode: #fff, 计数: 47}}
- {imagePath1: {hexCode: #fef, 计数: 61}}
- {imagePath2: {hexCode: #fff, 计数: 23}}
- {imagePath2: {hexCode: #fef, 计数: 55}}
所以这里imagePath1有47个#fff,imagePath2有23个#fff。
这就是我在 atm 上尝试做的事情。
也许 select 在分组前通过复合键?像这样:
SteamsBuilder topology = new StreamsBuilder();
topology.stream("input")
.selectKey((k, v) -> k + v.hex)
.groupByKey()
.count()
这不是 groupBy 两次,而是让你得到想要的效果。
更新 评论后:
class Image {
public String imagePath;
}
class ImageAggregation {
public String imagePath;
public int count;
}
class ImageSerde implements Serde<Image> {
// implement
}
class ImageAggregationSerde implements Serde<ImageAggregation> {
// implement
}
KTable<String, ImageAggregation> table = topology
.stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
.groupBy((k, v) -> v.imagePath)
.aggregate(ImageAggregation::new,
(k, v, agg) -> {
agg.imagePath = v.imagePath;
agg.count = agg.count + 1;
return agg;
}, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());
更新 2 post 更新后:
class ImageHex {
public String imagePath;
public String hex;
}
class ImageHexAggregation {
public String imagePath;
public Map<String, Integer> counts;
}
class ImageHexSerde implements Serde<ImageHex> {
// implement
}
class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
// implement
}
KTable<String, ImageHexAggregation> table = topology
.stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
.groupBy((k, v) -> v.imagePath)
.aggregate(ImageHexAggregation::new,
(k, v, agg) -> {
agg.imagePath = v.imagePath;
Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
agg.counts.put(v.hex, currentCount + 1));
return agg;
}, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());
我想创建一个条形图来显示图像中有多少像素颜色;图像每 3 秒更新一次,因此我的条形图也会更新。
我有一个主题收集 JSON 个对象,这些对象的键是图像创建日期,值是十六进制值(例如#FFF)。
我想按键分组,所以它按图像分组,然后按每个组的十六进制值分组并执行 .count()。
你是怎么做到的?
我在想 streams.groupByKey()...然后按十六进制值分组,但我需要将 KTable 转换为 KStream...
更新
抱歉,我在 phone 上打字时没有解释清楚。 我会再试着解释一下。
顺便说一句,我改变了一些东西。这是我的 github 如果您想阅读我在做什么:https://github.com/Lilmortal.
- 我的项目 "HexGraph-source-connector" 在 指定目录并将图片路径推送到主题
- "HexGraph"项目捡起来,用Akka,演员们会得到 所有像素十六进制代码分别开始推送消息 转到另一个主题。
- "HexGraph-stream" 是我的 kafka 流部分。
但它很长,我怀疑你会读它哈哈。
不管怎样,我阅读一个主题时,都会收到这样的消息 {imagePath: {hexCode: #fff}}。 图片路径是key,hexCode是value。我可以有一对多的 imagePaths,所以我的想法是我的前端将有一个 websocket 来接收它。它将显示一个图像,并且在其顶部有一个条形图,其中包含像素颜色代码的数量。例如有4个#fff、28个#fef等
因此我想按 imagePath 分组,然后我想计算该 imagePath 的每个像素。
例如:
- {imagePath1: {hexCode: #fff, 计数: 47}}
- {imagePath1: {hexCode: #fef, 计数: 61}}
- {imagePath2: {hexCode: #fff, 计数: 23}}
- {imagePath2: {hexCode: #fef, 计数: 55}}
所以这里imagePath1有47个#fff,imagePath2有23个#fff。
这就是我在 atm 上尝试做的事情。
也许 select 在分组前通过复合键?像这样:
SteamsBuilder topology = new StreamsBuilder();
topology.stream("input")
.selectKey((k, v) -> k + v.hex)
.groupByKey()
.count()
这不是 groupBy 两次,而是让你得到想要的效果。
更新 评论后:
class Image {
public String imagePath;
}
class ImageAggregation {
public String imagePath;
public int count;
}
class ImageSerde implements Serde<Image> {
// implement
}
class ImageAggregationSerde implements Serde<ImageAggregation> {
// implement
}
KTable<String, ImageAggregation> table = topology
.stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
.groupBy((k, v) -> v.imagePath)
.aggregate(ImageAggregation::new,
(k, v, agg) -> {
agg.imagePath = v.imagePath;
agg.count = agg.count + 1;
return agg;
}, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());
更新 2 post 更新后:
class ImageHex {
public String imagePath;
public String hex;
}
class ImageHexAggregation {
public String imagePath;
public Map<String, Integer> counts;
}
class ImageHexSerde implements Serde<ImageHex> {
// implement
}
class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
// implement
}
KTable<String, ImageHexAggregation> table = topology
.stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
.groupBy((k, v) -> v.imagePath)
.aggregate(ImageHexAggregation::new,
(k, v, agg) -> {
agg.imagePath = v.imagePath;
Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
agg.counts.put(v.hex, currentCount + 1));
return agg;
}, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());