Kafka 流与 Scala
Kafka streams with Scala
我正在尝试将 kafka 流与 scala 一起使用
下面是我在 Java 中的代码,它工作得很好:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
我的scala代码如下:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
scala 代码抛出编译错误。类型不匹配 expected:ForEachAction[>String,>String],Actual((any,any), Unit)
不是 found:value 键
未找到:值值
有谁知道如何在 Scala 中使用流 API
你的语法有误:)。 ->
只是创建对的运算符,所以表达式
(key,value)-> {
println(key)
}
具有类型 ((Any, Any), Unit),因为编译器无法推断任何类型信息(并且缺少 key
和 value
)
如果您使用的是 scala 2.12,将 ->
替换为 =>
应该可以解决问题,但是如果您使用的是旧版本的 scala,则必须显式实现 java 双功能:
textLines.foreach(new BiFunction[T1, T2] { ... })
可以直接使用print方法打印kafkastream
textlines.print
它将打印kafka流。您甚至可以通过将参数传递给打印函数来打印键或值。
我正在尝试将 kafka 流与 scala 一起使用 下面是我在 Java 中的代码,它工作得很好:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
我的scala代码如下:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
scala 代码抛出编译错误。类型不匹配 expected:ForEachAction[>String,>String],Actual((any,any), Unit) 不是 found:value 键 未找到:值值
有谁知道如何在 Scala 中使用流 API
你的语法有误:)。 ->
只是创建对的运算符,所以表达式
(key,value)-> {
println(key)
}
具有类型 ((Any, Any), Unit),因为编译器无法推断任何类型信息(并且缺少 key
和 value
)
如果您使用的是 scala 2.12,将 ->
替换为 =>
应该可以解决问题,但是如果您使用的是旧版本的 scala,则必须显式实现 java 双功能:
textLines.foreach(new BiFunction[T1, T2] { ... })
可以直接使用print方法打印kafkastream
textlines.print
它将打印kafka流。您甚至可以通过将参数传递给打印函数来打印键或值。