Kafka 使用 Kotlin 流式传输物化视图
Kafka Streams Materialized View with Kotlin
要在 Java 中创建一个 kafka 流状态存储,我可以这样做:
final KGroupedStream<String, String> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word);
wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(WORD_COUNT_STORE));
我正在尝试将其转换为 Kotlin,如下所示:
val wordCounts: KGroupedStream<String, String> = textLines
.flatMapValues({value -> value.split("\W+") })
.groupBy({ _, word -> word})
wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, Array<Byte>>>as(WORD_COUNT_STORE))
但是,我收到以下编译器错误:
Interface KeyValueStore does not have constructors
我需要做什么?
由于 as
是 Kotlin 中的保留字,请尝试用反引号将 as
括起来,即
`as`
如果它对其他人有用,以及 Raman 建议的反引号,我必须进行其他一些更改:
- 首先,需要在
as
方法之后指定泛型类型,而不是直接在 Materialized
class. 之后指定
- 其次,我不得不使用
ByteArray
. 而不是使用 Array<Byte>
所以对我有用的完整代码行是:
wordCounts.count(Materialized.`as`<String, Long, KeyValueStore<Bytes, ByteArray>>(WORD_COUNT_STORE))
要在 Java 中创建一个 kafka 流状态存储,我可以这样做:
final KGroupedStream<String, String> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word);
wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(WORD_COUNT_STORE));
我正在尝试将其转换为 Kotlin,如下所示:
val wordCounts: KGroupedStream<String, String> = textLines
.flatMapValues({value -> value.split("\W+") })
.groupBy({ _, word -> word})
wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, Array<Byte>>>as(WORD_COUNT_STORE))
但是,我收到以下编译器错误:
Interface KeyValueStore does not have constructors
我需要做什么?
由于 as
是 Kotlin 中的保留字,请尝试用反引号将 as
括起来,即
`as`
如果它对其他人有用,以及 Raman 建议的反引号,我必须进行其他一些更改:
- 首先,需要在
as
方法之后指定泛型类型,而不是直接在Materialized
class. 之后指定
- 其次,我不得不使用
ByteArray
. 而不是使用
Array<Byte>
所以对我有用的完整代码行是:
wordCounts.count(Materialized.`as`<String, Long, KeyValueStore<Bytes, ByteArray>>(WORD_COUNT_STORE))