found 和 required 在以下错误中有什么区别

What is difference between found and required in following errors

在处理 Scala Kafka KeyValueMapper 实现时,出现以下错误。我不确定到底有什么区别。 感谢您的帮助。

代码:

  1. 我根据主题创建了一个 KTable

    val creducer: Reducer[java.lang.Long] =
      (v1, v2) => if (v1 > v2) v1 else v2
    
    val deduplicationWindow = TimeWindows
      .of(60000L * 10)
      .advanceBy(60000L)
      .until(60000L * 10)
    
    val ktwindow: KTable[Windowed[String], java.lang.Long] =
      ipandTime
        .groupByKey(Serdes.String(), Serdes.Long())
        .reduce(creducer, deduplicationWindow, "ktwindow-query")
    
  2. 当我尝试使用 Windowed[String] 键创建流时,使用 selectKey 方法时出现错误。 java 中的类似实现工作正常。

    val fStream = ktwindow
      .toStream()
      .selectKey(
        new KeyValueMapper[Windowed[String],
                           java.lang.Long,
                           KeyValue[String, java.lang.Long]] {
          override def apply(
              key: Windowed[String],
              value: java.lang.Long): KeyValue[String, java.lang.Long] = {
            new KeyValue(key.key(), value)
          }
        }
      )
    
[error]  found   : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]

[error]  required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]

找不到变量ipandTime,所以我用???代替了它,但这与实际问题没有任何关系。

正如我所说,如果 Java use-site 通配符的类型推断失败,那么只需添加显式类型参数即可。这是为 Kafka 1.1.0 编译的:

import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.KeyValue


object Q49594920 {

      val creducer: Reducer[java.lang.Long] =
      (v1, v2) => if (v1 > v2) v1 else v2

    val deduplicationWindow = TimeWindows
      .of(60000L * 10)
      .advanceBy(60000L)
      .until(60000L * 10)

    val ktwindow: KTable[Windowed[String], java.lang.Long] = ???
      // ipandTime // What's that? It's not defined anywhere!
      //   .groupByKey(Serdes.String(), Serdes.Long())
      //   .reduce(creducer, deduplicationWindow, "ktwindow-query")

    val fStream = ktwindow
      .toStream()
      .selectKey[KeyValue[String, java.lang.Long]](
        new KeyValueMapper[Windowed[String],
                           java.lang.Long,
                           KeyValue[String, java.lang.Long]] {
          override def apply(
              key: Windowed[String],
              value: java.lang.Long): KeyValue[String, java.lang.Long] = {
            new KeyValue(key.key(), value)
          }
        }
      )
}

selectKey 方法需要一个泛型类型参数 KR,所以我只是给了它具体类型 KeyValue[String, java.lang.Long],然后它就起作用了。