Flink Scala API 泛型参数函数
Flink Scala API functions on generic parameters
这是 上的后续问题。
我希望能够传递 Flink 的 DataSet
s 并用它做一些事情,但是数据集的参数是通用的。
这是我现在遇到的问题:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.reflect.ClassTag
object TestFlink {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val split = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
id(split).print()
env.execute()
}
def id[K: ClassTag](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
}
我有 ds.map(r => r)
这个错误:
Multiple markers at this line
- not enough arguments for method map: (implicit evidence6: org.apache.flink.api.common.typeinfo.TypeInformation[K], implicit
evidence7: scala.reflect.ClassTag[K])org.apache.flink.api.scala.DataSet[K]. Unspecified value parameters evidence6, evidence7.
- not enough arguments for method map: (implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[K], implicit evidence
: scala.reflect.ClassTag[K])org.apache.flink.api.scala.DataSet[K]. Unspecified value parameters evidence, evidence.
- could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[K]
当然,这里的id
函数只是一个例子,我希望能够用它做一些更复杂的事情。
如何解决?
您还需要将 TypeInformation 作为上下文绑定在 K 参数上,因此:
def id[K: ClassTag: TypeInformation](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
原因是,Flink 会分析您在程序中使用的类型,并为您使用的每种类型创建一个 TypeInformation 实例。如果要创建通用操作,则需要通过添加上下文绑定来确保该类型的 TypeInformation 可用。这样,Scala 编译器将确保实例在泛型函数的调用点可用。
这是
我希望能够传递 Flink 的 DataSet
s 并用它做一些事情,但是数据集的参数是通用的。
这是我现在遇到的问题:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.reflect.ClassTag
object TestFlink {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val split = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
id(split).print()
env.execute()
}
def id[K: ClassTag](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
}
我有 ds.map(r => r)
这个错误:
Multiple markers at this line
- not enough arguments for method map: (implicit evidence6: org.apache.flink.api.common.typeinfo.TypeInformation[K], implicit
evidence7: scala.reflect.ClassTag[K])org.apache.flink.api.scala.DataSet[K]. Unspecified value parameters evidence6, evidence7.
- not enough arguments for method map: (implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[K], implicit evidence
: scala.reflect.ClassTag[K])org.apache.flink.api.scala.DataSet[K]. Unspecified value parameters evidence, evidence.
- could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[K]
当然,这里的id
函数只是一个例子,我希望能够用它做一些更复杂的事情。
如何解决?
您还需要将 TypeInformation 作为上下文绑定在 K 参数上,因此:
def id[K: ClassTag: TypeInformation](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
原因是,Flink 会分析您在程序中使用的类型,并为您使用的每种类型创建一个 TypeInformation 实例。如果要创建通用操作,则需要通过添加上下文绑定来确保该类型的 TypeInformation 可用。这样,Scala 编译器将确保实例在泛型函数的调用点可用。