Kafka Streams - 与旧状态的聚合

Kafka Streams - Aggregation with old state

我有一个 KStream,其中包含来自主题 to1 的数据,如下所示:

T1-KEY -> {T1}
T2-KEY -> {T2}

和一个KTable,构造如下:

我正在使用 org.apache.kafka.streams.StreamsBuilder 从某个主题 to2[=83= 创建 KTable ] 看起来像这样:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..

流然后被平面映射并按 Key s.t 分组。结果 KTable 看起来像这样:

T1 -> { ["B1", "B2"] }

稍后,主题 to2 中出现以下消息:

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          } 

现在我希望我的 KTable 能够反映这些变化,看起来像这样:

T1 -> { ["B2"] }

但看起来像这样:

T1 -> { ["B1", "B2"] }

我注意到,在我的 Aggregator<Tx-KEY, Bx, Set<Bx>> 中,给出的最后一个参数是集合 ["B1", "B2"] 即使当我在聚合之前查看时我只得到一个匹配项 "B2".

我对聚合的理解有误还是这里发生了什么?

编辑

我想我缩小了范围:显然聚合的 Initializer 只在第一次 非常 时被调用 - 此后聚合总是收到 last aggregate 作为最后一个参数,例如

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

其中 Set<Bx> aggregate 在第一次调用(通过初始化器创建)时是 [],但在第二次调用时是 ["B1", "B2"]

有什么想法吗?

编辑 2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

编辑 3

不过我不能只有平面图,因为我必须组合多个 Ax 元素,例如

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...

然后我期待一些喜欢的人

T1 -> { ["B1", "B2"] }

并在下一次迭代中,当消息

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

如我所愿

T1 -> { ["B1"] }

..

请注意,在您的聚合器中,您是如何只向聚合集中添加元素的。有了这个逻辑,你的集合(对于给定的键)永远不会缩小。在这种情况下,我认为你已经将流压平了太多。我建议您不要将其扁平化到您的消息采用 (Tx-KEY key, Bx value) 形式的程度,而是让它们始终保持其固定形式:(Tx-KEY key, Set<Bx> value)。你根本不需要聚合。 为此,我建议您转换输入集

"Set": [
     {"B1", "Rel": "T1"},
     {"B2", "Rel": "T1"}
]

进入

T1 -> { ["B1", "B2"] }

通过在 KStream 平面图方法调用中使用标准 java 代码(集合或流 api)按 "Rel" 字段分组,这样您就只会使用 [= KStream 上的 14=] 类型值,而不是单独的 Bx 类型值。

如果您提供当前平面图实现的代码,我们很乐意详细说明。