Kafka Stream Chained LeftJoin - 在新消息之后再次处理以前的旧消息
Kafka Stream Chained LeftJoin - Processing previous old message again after the new one
我有一个由其他流组成的流
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeGeographyTable,
(CompositeInfo cinfo, CompositeGeography cg) -> {
if (cg != null) {
cinfo.regions = cg.regions;
}
return cinfo;
})
.leftJoin(
compositeSectorTable,
(CompositeInfo cinfo, CompositeSector cs) -> {
if (cs != null) {
cinfo.sectors = cs.sectors;
}
return cinfo;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
})
.leftJoin(
compositeAlphaClusterTable,
(CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
if (cac != null) {
cinfo.alphaClusters = cac.alphaClusters;
};
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
我的问题与 CompositeInfo 和 CustomCluster 之间的左连接有关。 CustomCluster 如下所示
KTable<Long, CustomCluster> compositeClusterTable = builder
.stream(
SUB_TOPIC_COMPOSITE_CLUSTER,
Consumed.with(Serdes.Long(), compositeClusterSerde))
.filter((k, v) -> v.clusters != null)
.groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
.reduce((aggValue, newValue) -> newValue);
自定义集群中的一条消息看起来像
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]
所以我将此对象中的 HashMap 簇分配给在 compositeId 上连接的 CompositeInfo 对象中的簇。
我所看到的是,对于给定的 compositeId,一条 CustomCluster 消息进入并正确处理,但随后再次处理包含先前集群的旧消息(我仍在调查此消息)。
通过挖掘问题发生在 kafka 内部 KTableKTableRightJoin
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
return;
}
final R newValue;
R oldValue = null;
final V2 value2 = valueGetter.get(key);
if (value2 == null) {
return;
}
newValue = joiner.apply(change.newValue, value2);
if (sendOldValues) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
}
第一次加入 returns 时,newValue 会正确更新。但是代码然后转到 sendOldValues 块,一旦加入者 returns,newValue 就是更新增益,但这次是旧的集群值。
所以这是我的问题:
- 为什么当加入者被调用时 newValues 得到更新
第二次使用 oldValue
- 有没有办法关闭 sendOldValues
- 我的链式左连接是否与此有关。我知道
以前版本的 kafka 有一个链接错误。但现在我在
1.0
更新:
我发现的另一件事。如果我将连接移到连接链上并删除其他连接,则 sendOldValues 仍为 False。因此,如果我有类似以下内容:
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
这给了我正确的结果。但我认为,如果我在此之后放置更多的链接连接,它们可能会显示相同的错误行为。
此时我不确定任何事情,但我认为我的问题在于 chained leftjoin 和计算 oldValue 的行为。还有其他人 运行 处理过这个问题吗?
更新
经过大量挖掘,我意识到 sendOldValues 是 kafka 内部的,而不是我遇到的问题的原因。我的问题是,当 oldValue returns 的 ValueJoiner 时,newValue 会发生变化,我不知道它是否是由于某些通过引用分配传递给 Java 对象
这是传入对象的样子
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]
簇是一个HashSet<Cluster> clusters = new HashSet<Cluster>();
然后连接到一个对象
CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]
此处的簇是同一类型,但在 CompositeInfo 中 class
当我加入时,我将 CustomCluster 对象的集群分配给 CompositeInfo 对象
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
}
确实是引用传递问题。加入时,我需要初始化并 return 一个新对象,而不是为旧对象赋值。
答案基于 对问题的评论。
在我自己遇到同样的问题后,我想提供一个详细的答案以及一个有助于说明问题的简化示例。
@Bean
public Function<KTable<String, String>,
Function<KTable<String, String>, Consumer<KTable<String, String>>>> processEvents() {
return firstnames ->
lastnames ->
titles -> firstnames
.mapValues(firstname -> new Salutation().withFirstname(firstname))
.join(lastnames, (salutation, lastname) -> salutation.withLastname(lastname))
.leftJoin(titles, (salutation, title) -> salutation.withTitle(title))
.toStream()
.foreach((key, salutation) -> log.info("{}: {}", key, salutation));
}
该示例(使用 Spring Cloud Stream 和 Kafka Streams 绑定器)显示了一种常见模式,其中主题内容合并到一个累加器 object 中。在我们的例子中,通过连接代表名字、姓氏和(可选)标题的主题,称呼(例如“亲爱的史密斯女士”)是 accumulated/aggregated 到 Salutation
object。
需要注意的是,在此示例中,Salutation
实例是一个逐步构建的可变 object。当运行这样一段代码时,你会看到,当改变一个人的姓氏时,合并总是“运行后面”。这意味着,如果您因为 Smith 女士刚刚结婚并且现在被称为“Johnson”而发布姓氏事件,那么 Kafka Streams 将再次发出代表“Ms. Smith”的 Salutation
,尽管她已更改她的姓氏。只有当您针对姓氏主题(例如“Miller”)为同一个人发布另一个事件时,才会记录“亲爱的约翰逊女士”。
此行为的原因可在位于 KTableKTableInnerJoin.java
:
的一段代码中找到
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, valueRight);
}
if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, valueRight);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
joiner
是一个 ValueJoiner
,在我们的例子中可以是如上所示 (salutation, lastname) -> salutation.withLastname(lastname)
。这段代码的问题在于,如果您使用带有可变累加器 object 的累加模式(在我们的例子中是 Salutation
的实例),它(按设计)被所有连接重用,那么 oldValue
和 newValue
将是相同的 object。此外,由于 oldValue
是在之后计算的,它将包含旧姓氏,这解释了为什么 Spring Kafka 落后于 运行。
因此,由 ValueJoiner
编辑的 object return 每次都是一个新的 object,它不包含对其他可变 objects,它可能会被共享(因此会发生变异)。因此,最安全的方法是让 ValueJoiner
return 成为不可变的 object.
我不认为这是库的错误,因为它必须以某种方式比较旧状态和新状态,而且获取可变 object 的快照需要深拷贝。但是,在文档中提及它可能是值得的。另外,在 oldValue == newValue
时发出警告至少会让人们意识到这个问题。我会检查是否可以合并这些改进。
我有一个由其他流组成的流
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeGeographyTable,
(CompositeInfo cinfo, CompositeGeography cg) -> {
if (cg != null) {
cinfo.regions = cg.regions;
}
return cinfo;
})
.leftJoin(
compositeSectorTable,
(CompositeInfo cinfo, CompositeSector cs) -> {
if (cs != null) {
cinfo.sectors = cs.sectors;
}
return cinfo;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
})
.leftJoin(
compositeAlphaClusterTable,
(CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
if (cac != null) {
cinfo.alphaClusters = cac.alphaClusters;
};
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
我的问题与 CompositeInfo 和 CustomCluster 之间的左连接有关。 CustomCluster 如下所示
KTable<Long, CustomCluster> compositeClusterTable = builder
.stream(
SUB_TOPIC_COMPOSITE_CLUSTER,
Consumed.with(Serdes.Long(), compositeClusterSerde))
.filter((k, v) -> v.clusters != null)
.groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
.reduce((aggValue, newValue) -> newValue);
自定义集群中的一条消息看起来像
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]
所以我将此对象中的 HashMap 簇分配给在 compositeId 上连接的 CompositeInfo 对象中的簇。
我所看到的是,对于给定的 compositeId,一条 CustomCluster 消息进入并正确处理,但随后再次处理包含先前集群的旧消息(我仍在调查此消息)。 通过挖掘问题发生在 kafka 内部 KTableKTableRightJoin
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
return;
}
final R newValue;
R oldValue = null;
final V2 value2 = valueGetter.get(key);
if (value2 == null) {
return;
}
newValue = joiner.apply(change.newValue, value2);
if (sendOldValues) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
}
第一次加入 returns 时,newValue 会正确更新。但是代码然后转到 sendOldValues 块,一旦加入者 returns,newValue 就是更新增益,但这次是旧的集群值。
所以这是我的问题:
- 为什么当加入者被调用时 newValues 得到更新 第二次使用 oldValue
- 有没有办法关闭 sendOldValues
- 我的链式左连接是否与此有关。我知道 以前版本的 kafka 有一个链接错误。但现在我在 1.0
更新: 我发现的另一件事。如果我将连接移到连接链上并删除其他连接,则 sendOldValues 仍为 False。因此,如果我有类似以下内容:
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
这给了我正确的结果。但我认为,如果我在此之后放置更多的链接连接,它们可能会显示相同的错误行为。
此时我不确定任何事情,但我认为我的问题在于 chained leftjoin 和计算 oldValue 的行为。还有其他人 运行 处理过这个问题吗?
更新
经过大量挖掘,我意识到 sendOldValues 是 kafka 内部的,而不是我遇到的问题的原因。我的问题是,当 oldValue returns 的 ValueJoiner 时,newValue 会发生变化,我不知道它是否是由于某些通过引用分配传递给 Java 对象
这是传入对象的样子
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]
簇是一个HashSet<Cluster> clusters = new HashSet<Cluster>();
然后连接到一个对象
CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]
此处的簇是同一类型,但在 CompositeInfo 中 class
当我加入时,我将 CustomCluster 对象的集群分配给 CompositeInfo 对象
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
}
确实是引用传递问题。加入时,我需要初始化并 return 一个新对象,而不是为旧对象赋值。
答案基于
在我自己遇到同样的问题后,我想提供一个详细的答案以及一个有助于说明问题的简化示例。
@Bean
public Function<KTable<String, String>,
Function<KTable<String, String>, Consumer<KTable<String, String>>>> processEvents() {
return firstnames ->
lastnames ->
titles -> firstnames
.mapValues(firstname -> new Salutation().withFirstname(firstname))
.join(lastnames, (salutation, lastname) -> salutation.withLastname(lastname))
.leftJoin(titles, (salutation, title) -> salutation.withTitle(title))
.toStream()
.foreach((key, salutation) -> log.info("{}: {}", key, salutation));
}
该示例(使用 Spring Cloud Stream 和 Kafka Streams 绑定器)显示了一种常见模式,其中主题内容合并到一个累加器 object 中。在我们的例子中,通过连接代表名字、姓氏和(可选)标题的主题,称呼(例如“亲爱的史密斯女士”)是 accumulated/aggregated 到 Salutation
object。
需要注意的是,在此示例中,Salutation
实例是一个逐步构建的可变 object。当运行这样一段代码时,你会看到,当改变一个人的姓氏时,合并总是“运行后面”。这意味着,如果您因为 Smith 女士刚刚结婚并且现在被称为“Johnson”而发布姓氏事件,那么 Kafka Streams 将再次发出代表“Ms. Smith”的 Salutation
,尽管她已更改她的姓氏。只有当您针对姓氏主题(例如“Miller”)为同一个人发布另一个事件时,才会记录“亲爱的约翰逊女士”。
此行为的原因可在位于 KTableKTableInnerJoin.java
:
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, valueRight);
}
if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, valueRight);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
joiner
是一个 ValueJoiner
,在我们的例子中可以是如上所示 (salutation, lastname) -> salutation.withLastname(lastname)
。这段代码的问题在于,如果您使用带有可变累加器 object 的累加模式(在我们的例子中是 Salutation
的实例),它(按设计)被所有连接重用,那么 oldValue
和 newValue
将是相同的 object。此外,由于 oldValue
是在之后计算的,它将包含旧姓氏,这解释了为什么 Spring Kafka 落后于 运行。
因此,由 ValueJoiner
编辑的 object return 每次都是一个新的 object,它不包含对其他可变 objects,它可能会被共享(因此会发生变异)。因此,最安全的方法是让 ValueJoiner
return 成为不可变的 object.
我不认为这是库的错误,因为它必须以某种方式比较旧状态和新状态,而且获取可变 object 的快照需要深拷贝。但是,在文档中提及它可能是值得的。另外,在 oldValue == newValue
时发出警告至少会让人们意识到这个问题。我会检查是否可以合并这些改进。