found 和 required 在以下错误中有什么区别
What is difference between found and required in following errors
在处理 Scala Kafka KeyValueMapper
实现时,出现以下错误。我不确定到底有什么区别。
感谢您的帮助。
代码:
我根据主题创建了一个 KTable
。
val creducer: Reducer[java.lang.Long] =
(v1, v2) => if (v1 > v2) v1 else v2
val deduplicationWindow = TimeWindows
.of(60000L * 10)
.advanceBy(60000L)
.until(60000L * 10)
val ktwindow: KTable[Windowed[String], java.lang.Long] =
ipandTime
.groupByKey(Serdes.String(), Serdes.Long())
.reduce(creducer, deduplicationWindow, "ktwindow-query")
当我尝试使用 Windowed[String]
键创建流时,使用 selectKey 方法时出现错误。 java 中的类似实现工作正常。
val fStream = ktwindow
.toStream()
.selectKey(
new KeyValueMapper[Windowed[String],
java.lang.Long,
KeyValue[String, java.lang.Long]] {
override def apply(
key: Windowed[String],
value: java.lang.Long): KeyValue[String, java.lang.Long] = {
new KeyValue(key.key(), value)
}
}
)
[error] found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]
[error] required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]
找不到变量ipandTime
,所以我用???
代替了它,但这与实际问题没有任何关系。
正如我所说,如果 Java use-site 通配符的类型推断失败,那么只需添加显式类型参数即可。这是为 Kafka 1.1.0 编译的:
import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.KeyValue
object Q49594920 {
val creducer: Reducer[java.lang.Long] =
(v1, v2) => if (v1 > v2) v1 else v2
val deduplicationWindow = TimeWindows
.of(60000L * 10)
.advanceBy(60000L)
.until(60000L * 10)
val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
// ipandTime // What's that? It's not defined anywhere!
// .groupByKey(Serdes.String(), Serdes.Long())
// .reduce(creducer, deduplicationWindow, "ktwindow-query")
val fStream = ktwindow
.toStream()
.selectKey[KeyValue[String, java.lang.Long]](
new KeyValueMapper[Windowed[String],
java.lang.Long,
KeyValue[String, java.lang.Long]] {
override def apply(
key: Windowed[String],
value: java.lang.Long): KeyValue[String, java.lang.Long] = {
new KeyValue(key.key(), value)
}
}
)
}
selectKey
方法需要一个泛型类型参数 KR
,所以我只是给了它具体类型 KeyValue[String, java.lang.Long]
,然后它就起作用了。
在处理 Scala Kafka KeyValueMapper
实现时,出现以下错误。我不确定到底有什么区别。
感谢您的帮助。
代码:
我根据主题创建了一个
KTable
。val creducer: Reducer[java.lang.Long] = (v1, v2) => if (v1 > v2) v1 else v2 val deduplicationWindow = TimeWindows .of(60000L * 10) .advanceBy(60000L) .until(60000L * 10) val ktwindow: KTable[Windowed[String], java.lang.Long] = ipandTime .groupByKey(Serdes.String(), Serdes.Long()) .reduce(creducer, deduplicationWindow, "ktwindow-query")
当我尝试使用
Windowed[String]
键创建流时,使用 selectKey 方法时出现错误。 java 中的类似实现工作正常。val fStream = ktwindow .toStream() .selectKey( new KeyValueMapper[Windowed[String], java.lang.Long, KeyValue[String, java.lang.Long]] { override def apply( key: Windowed[String], value: java.lang.Long): KeyValue[String, java.lang.Long] = { new KeyValue(key.key(), value) } } )
[error] found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]
[error] required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]
找不到变量ipandTime
,所以我用???
代替了它,但这与实际问题没有任何关系。
正如我所说,如果 Java use-site 通配符的类型推断失败,那么只需添加显式类型参数即可。这是为 Kafka 1.1.0 编译的:
import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.KeyValue
object Q49594920 {
val creducer: Reducer[java.lang.Long] =
(v1, v2) => if (v1 > v2) v1 else v2
val deduplicationWindow = TimeWindows
.of(60000L * 10)
.advanceBy(60000L)
.until(60000L * 10)
val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
// ipandTime // What's that? It's not defined anywhere!
// .groupByKey(Serdes.String(), Serdes.Long())
// .reduce(creducer, deduplicationWindow, "ktwindow-query")
val fStream = ktwindow
.toStream()
.selectKey[KeyValue[String, java.lang.Long]](
new KeyValueMapper[Windowed[String],
java.lang.Long,
KeyValue[String, java.lang.Long]] {
override def apply(
key: Windowed[String],
value: java.lang.Long): KeyValue[String, java.lang.Long] = {
new KeyValue(key.key(), value)
}
}
)
}
selectKey
方法需要一个泛型类型参数 KR
,所以我只是给了它具体类型 KeyValue[String, java.lang.Long]
,然后它就起作用了。