Kafka Streams - 与旧状态的聚合
Kafka Streams - Aggregation with old state
我有一个 KStream,其中包含来自主题 to1 的数据,如下所示:
T1-KEY -> {T1}
T2-KEY -> {T2}
和一个KTable,构造如下:
我正在使用 org.apache.kafka.streams.StreamsBuilder 从某个主题 to2[=83= 创建 KTable ] 看起来像这样:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
流然后被平面映射并按 Key s.t 分组。结果 KTable 看起来像这样:
T1 -> { ["B1", "B2"] }
稍后,主题 to2 中出现以下消息:
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
现在我希望我的 KTable 能够反映这些变化,看起来像这样:
T1 -> { ["B2"] }
但看起来像这样:
T1 -> { ["B1", "B2"] }
我注意到,在我的 Aggregator<Tx-KEY, Bx, Set<Bx>>
中,给出的最后一个参数是集合 ["B1", "B2"]
即使当我在聚合之前查看时我只得到一个匹配项 "B2"
.
我对聚合的理解有误还是这里发生了什么?
编辑
我想我缩小了范围:显然聚合的 Initializer
只在第一次 非常 时被调用 - 此后聚合总是收到 last aggregate
作为最后一个参数,例如
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
其中 Set<Bx> aggregate
在第一次调用(通过初始化器创建)时是 []
,但在第二次调用时是 ["B1", "B2"]
。
有什么想法吗?
编辑 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
编辑 3
不过我不能只有平面图,因为我必须组合多个 Ax 元素,例如
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
然后我期待一些喜欢的人
T1 -> { ["B1", "B2"] }
并在下一次迭代中,当消息
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
如我所愿
T1 -> { ["B1"] }
..
请注意,在您的聚合器中,您是如何只向聚合集中添加元素的。有了这个逻辑,你的集合(对于给定的键)永远不会缩小。在这种情况下,我认为你已经将流压平了太多。我建议您不要将其扁平化到您的消息采用 (Tx-KEY key, Bx value)
形式的程度,而是让它们始终保持其固定形式:(Tx-KEY key, Set<Bx> value)
。你根本不需要聚合。
为此,我建议您转换输入集
"Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
进入
T1 -> { ["B1", "B2"] }
通过在 KStream 平面图方法调用中使用标准 java 代码(集合或流 api)按 "Rel" 字段分组,这样您就只会使用 [= KStream 上的 14=] 类型值,而不是单独的 Bx
类型值。
如果您提供当前平面图实现的代码,我们很乐意详细说明。
我有一个 KStream,其中包含来自主题 to1 的数据,如下所示:
T1-KEY -> {T1}
T2-KEY -> {T2}
和一个KTable,构造如下:
我正在使用 org.apache.kafka.streams.StreamsBuilder 从某个主题 to2[=83= 创建 KTable ] 看起来像这样:
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
}
..
流然后被平面映射并按 Key s.t 分组。结果 KTable 看起来像这样:
T1 -> { ["B1", "B2"] }
稍后,主题 to2 中出现以下消息:
A1-KEY -> { "A1", "Set": [
{"B2", "Rel": "T1"}
]
}
现在我希望我的 KTable 能够反映这些变化,看起来像这样:
T1 -> { ["B2"] }
但看起来像这样:
T1 -> { ["B1", "B2"] }
我注意到,在我的 Aggregator<Tx-KEY, Bx, Set<Bx>>
中,给出的最后一个参数是集合 ["B1", "B2"]
即使当我在聚合之前查看时我只得到一个匹配项 "B2"
.
我对聚合的理解有误还是这里发生了什么?
编辑
我想我缩小了范围:显然聚合的 Initializer
只在第一次 非常 时被调用 - 此后聚合总是收到 last aggregate
作为最后一个参数,例如
@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {
}
其中 Set<Bx> aggregate
在第一次调用(通过初始化器创建)时是 []
,但在第二次调用时是 ["B1", "B2"]
。
有什么想法吗?
编辑 2
public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {
@Override
public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
aggregate.add(value);
return aggregate;
}
}
编辑 3
不过我不能只有平面图,因为我必须组合多个 Ax 元素,例如
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
},
A2-KEY -> { "A2", "Set": [
{"B2", "Rel": "T1"}
]
},
...
然后我期待一些喜欢的人
T1 -> { ["B1", "B2"] }
并在下一次迭代中,当消息
A1-KEY -> { "A1", "Set": [
{"B1", "Rel": "T1"}
]
}
如我所愿
T1 -> { ["B1"] }
..
请注意,在您的聚合器中,您是如何只向聚合集中添加元素的。有了这个逻辑,你的集合(对于给定的键)永远不会缩小。在这种情况下,我认为你已经将流压平了太多。我建议您不要将其扁平化到您的消息采用 (Tx-KEY key, Bx value)
形式的程度,而是让它们始终保持其固定形式:(Tx-KEY key, Set<Bx> value)
。你根本不需要聚合。
为此,我建议您转换输入集
"Set": [
{"B1", "Rel": "T1"},
{"B2", "Rel": "T1"}
]
进入
T1 -> { ["B1", "B2"] }
通过在 KStream 平面图方法调用中使用标准 java 代码(集合或流 api)按 "Rel" 字段分组,这样您就只会使用 [= KStream 上的 14=] 类型值,而不是单独的 Bx
类型值。
如果您提供当前平面图实现的代码,我们很乐意详细说明。