Kafka Streams API: Session Window 不兼容的类型
Kafka Streams API: Session Window incompatible types
我有以下片段:
groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<byte[], byte[]> mergedTable =
groupedStream
.reduce((aggregateValue, newValue) -> {
try {
Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
aggregateMap.forEach(recentMap::putIfAbsent);
newValue = MAPPER.writeValueAsString(recentMap).getBytes();
} catch (Exception e) {
LOG.warn("Couldn't aggregate key grouped stream\n", e);
}
return newValue;
}, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
.suppress(Suppressed.untilWindowCloses(unbounded()));
我收到以下编译异常:
Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>
我知道如果我像这样内联 windowedBy
:
KTable<Windowed<byte[]>, byte[]> mergedTable =
groupedStream
.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
.reduce((aggregateValue, newValue) -> {
try {
Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
aggregateMap.forEach(recentMap::putIfAbsent);
newValue = MAPPER.writeValueAsString(recentMap).getBytes();
} catch (Exception e) {
LOG.warn("Couldn't aggregate key grouped stream\n", e);
}
return newValue;
}, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
.suppress(Suppressed.untilWindowCloses(unbounded()));
它有效,但我不确定如何将这两个分开...
这里有两个问题。
第一个问题是 KGroupedStream.windowedBy(SessionWindows) returns SessionWindowedKStream<K, V>
的实例,在您的第一个示例中
groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
您没有在变量中捕获返回的 SessionWindowedKStream
。
第二个问题是在您的第一个代码示例中
KTable<byte[], byte[]> mergedTable
什么时候应该
KTable<Windowed<byte[]>, byte[]> mergedTable
就像你的第二个例子一样。
如果将代码更改为
SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<Windowed<byte[]>, byte[]> mergedTable =
sessionWindowedKStream
.reduce((aggregateValue, newValue) -> {...
那么它应该可以正常编译。
HTH
比尔
我有以下片段:
groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<byte[], byte[]> mergedTable =
groupedStream
.reduce((aggregateValue, newValue) -> {
try {
Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
aggregateMap.forEach(recentMap::putIfAbsent);
newValue = MAPPER.writeValueAsString(recentMap).getBytes();
} catch (Exception e) {
LOG.warn("Couldn't aggregate key grouped stream\n", e);
}
return newValue;
}, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
.suppress(Suppressed.untilWindowCloses(unbounded()));
我收到以下编译异常:
Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>
我知道如果我像这样内联 windowedBy
:
KTable<Windowed<byte[]>, byte[]> mergedTable =
groupedStream
.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
.reduce((aggregateValue, newValue) -> {
try {
Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
aggregateMap.forEach(recentMap::putIfAbsent);
newValue = MAPPER.writeValueAsString(recentMap).getBytes();
} catch (Exception e) {
LOG.warn("Couldn't aggregate key grouped stream\n", e);
}
return newValue;
}, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
.suppress(Suppressed.untilWindowCloses(unbounded()));
它有效,但我不确定如何将这两个分开...
这里有两个问题。
第一个问题是 KGroupedStream.windowedBy(SessionWindows) returns SessionWindowedKStream<K, V>
的实例,在您的第一个示例中
groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
您没有在变量中捕获返回的 SessionWindowedKStream
。
第二个问题是在您的第一个代码示例中
KTable<byte[], byte[]> mergedTable
什么时候应该
KTable<Windowed<byte[]>, byte[]> mergedTable
就像你的第二个例子一样。
如果将代码更改为
SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
KTable<Windowed<byte[]>, byte[]> mergedTable =
sessionWindowedKStream
.reduce((aggregateValue, newValue) -> {...
那么它应该可以正常编译。
HTH 比尔