flatMap 函数中的 Apache Flink Streaming 类型不匹配
Apache Flink Streaming type mismatch in flatMap function
尝试在 scala 2.10.4 中使用 0.10.0 flink 版本的流 api。在尝试编译第一个版本时:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val words : DataStream[String] = text.flatMap[String](
new Function[String,TraversableOnce[String]] {
def apply(line:String):TraversableOnce[String] = line.split(" ")
})
env.execute("Window Stream wordcount")
}
}
我遇到编译时错误:
[error] found : String => TraversableOnce[String]
[error] required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error] new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error] ^
在我包含到项目中的 DataStream.class 的反编译版本中,有一些函数接受这种类型(最后一个):
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence, ClassTag<R> evidence) {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
TypeInformation outType = (TypeInformation)Predef..MODULE$.implicitly(evidence);
return package..MODULE$.javaToScalaStream((org.apache.flink.streaming.api.datastream.DataStream)this.javaStream.flatMap(flatMapper).returns(outType));
}
public <R> DataStream<R> flatMap(Function2<T, Collector<R>, BoxedUnit> fun, TypeInformation<R> evidence, ClassTag<R> evidence) {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
Function2<T, Collector<R>, BoxedUnit> cleanFun = this.clean((F)fun);
.anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence, evidence);
}
public <R> DataStream<R> flatMap(Function1<T, TraversableOnce<R>> fun, TypeInformation<R> evidence, ClassTag<R> evidence) {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
Function1<T, TraversableOnce<R>> cleanFun = this.clean((F)fun);
.anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence, evidence);
}
这里可能有什么问题?如果您能提供一些见解,我将不胜感激。
提前谢谢你。
问题是你正在导入 Flink 的 Java StreamExecutionEnvironment
: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
.
您必须像这样使用 StreamExecutionEnvironment
的 Scala 变体:import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
。
有了这个改变,一切都在成功构建!
原回答:
问题是您将 Function
传递给 flatMap()
方法。然而 flatMap()
期望 FlatMapFunction
.
val words : DataStream[String] = text.flatMap[String](
new FlatMapFunction[String,String] {
override def flatMap(t: String, collector: Collector[String]): Unit = t.split(" ")
})
尝试在 scala 2.10.4 中使用 0.10.0 flink 版本的流 api。在尝试编译第一个版本时:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val words : DataStream[String] = text.flatMap[String](
new Function[String,TraversableOnce[String]] {
def apply(line:String):TraversableOnce[String] = line.split(" ")
})
env.execute("Window Stream wordcount")
}
}
我遇到编译时错误:
[error] found : String => TraversableOnce[String]
[error] required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error] new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error] ^
在我包含到项目中的 DataStream.class 的反编译版本中,有一些函数接受这种类型(最后一个):
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence, ClassTag<R> evidence) {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
TypeInformation outType = (TypeInformation)Predef..MODULE$.implicitly(evidence);
return package..MODULE$.javaToScalaStream((org.apache.flink.streaming.api.datastream.DataStream)this.javaStream.flatMap(flatMapper).returns(outType));
}
public <R> DataStream<R> flatMap(Function2<T, Collector<R>, BoxedUnit> fun, TypeInformation<R> evidence, ClassTag<R> evidence) {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
Function2<T, Collector<R>, BoxedUnit> cleanFun = this.clean((F)fun);
.anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence, evidence);
}
public <R> DataStream<R> flatMap(Function1<T, TraversableOnce<R>> fun, TypeInformation<R> evidence, ClassTag<R> evidence) {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.");
}
Function1<T, TraversableOnce<R>> cleanFun = this.clean((F)fun);
.anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence, evidence);
}
这里可能有什么问题?如果您能提供一些见解,我将不胜感激。 提前谢谢你。
问题是你正在导入 Flink 的 Java StreamExecutionEnvironment
: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
.
您必须像这样使用 StreamExecutionEnvironment
的 Scala 变体:import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
。
有了这个改变,一切都在成功构建!
原回答:
问题是您将 Function
传递给 flatMap()
方法。然而 flatMap()
期望 FlatMapFunction
.
val words : DataStream[String] = text.flatMap[String](
new FlatMapFunction[String,String] {
override def flatMap(t: String, collector: Collector[String]): Unit = t.split(" ")
})