为什么此 KStream/KTable 拓扑会传播未通过过滤器的记录?
Why is this KStream/KTable topology propagating records that don't pass the filter?
我有以下拓扑:
- 创建状态存储
- 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC
但是我在 STATIONS_LOW_CAPACITY_TOPIC 上看到了这个:
� null
� null
� null
� {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
� {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
� null
也就是说,它好像也将那些没有通过过滤的记录发布到STATIONS_LOW_CAPACITY_TOPIC主题。这怎么可能?我怎样才能阻止它们被发布?
这是 ksteams 代码:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
Stats(XXX)
}
.toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
更新: 我已经对拓扑进行了简单化并打印了结果 table。由于某种原因,最终的 KTable 还包含对应于未通过过滤器的上游记录的空值记录:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value ->
val conditionResult = (SOME_CONDITION)
println(conditionResult)
conditionResult
}
.print()
日志:
false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)
答案在 KTable.filter(...)
的 javadoc 中:
Note that filter for a changelog stream works different to record
stream filters, because records with null values (so-called tombstone
records) have delete semantics. Thus, for tombstones the provided
filter predicate is not evaluated but the tombstone record is
forwarded directly if required (i.e., if there is anything to be
deleted). Furthermore, for each record that gets dropped (i.e., dot
not satisfy the given predicate) a tombstone record is forwarded.
这就解释了为什么我看到向下游发送的空值(逻辑删除)记录。
为了避免它,我将 KTable 转换为 KStream,然后应用过滤器:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.toStream()
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
StationStats(station.id, station.latitude, station.longitude, ...)
}
.to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
结果:
4 {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5 {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...
我有以下拓扑:
- 创建状态存储
- 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC
但是我在 STATIONS_LOW_CAPACITY_TOPIC 上看到了这个:
� null
� null
� null
� {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
� {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
� null
也就是说,它好像也将那些没有通过过滤的记录发布到STATIONS_LOW_CAPACITY_TOPIC主题。这怎么可能?我怎样才能阻止它们被发布?
这是 ksteams 代码:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
Stats(XXX)
}
.toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
更新: 我已经对拓扑进行了简单化并打印了结果 table。由于某种原因,最终的 KTable 还包含对应于未通过过滤器的上游记录的空值记录:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value ->
val conditionResult = (SOME_CONDITION)
println(conditionResult)
conditionResult
}
.print()
日志:
false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)
答案在 KTable.filter(...)
的 javadoc 中:
Note that filter for a changelog stream works different to record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record is forwarded.
这就解释了为什么我看到向下游发送的空值(逻辑删除)记录。
为了避免它,我将 KTable 转换为 KStream,然后应用过滤器:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.toStream()
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
StationStats(station.id, station.latitude, station.longitude, ...)
}
.to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
结果:
4 {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5 {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...