"Input mismatch: Tuple type expected" 在 PatternStream 上尝试 select
"Input mismatch: Tuple type expected" when trying to select on PatternStream
我在测试新的 Flink 1.0.0 功能时遇到了一些麻烦。我一直在修补 CEP,但我还没有设法 运行 一个简单的演示代码:
val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);
class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
pattern.get("start")
}
}
val alerts = patternStream.select(new MyPatternSelectFunction())
代码编译良好,maven 没有显示警告。 TrafficEvent 是一个 class,只有几个简单的字段,stream 是一个 class 的 Scala DataStream。当代码在 Flink 上 运行ning 时出现错误。它 运行s 一秒钟,然后代码退出并显示此错误消息:
程序已完成,但出现以下异常:
Input mismatch: Tuple type expected.
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
com.demo.DemoTraffic$.main(DemoTraffic.scala:311)
我试图通过像这样构建静态 class 来将功能移动到 Java(也许从 Scala 调用 API 时会出现一些奇怪的问题):
public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
@Override
public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
return pattern.get("start");
}
});
return rvalue;
}
但结果完全一样,在PatternStream.select行抛出同样的错误。关于我可以尝试什么或我做错了什么的任何提示?如您所见,该模式非常愚蠢,它仅用于测试目的。它只接受所有事件,并返回该事件作为响应。 Flink是1.0.0,使用Scala 2.10版本。
谢谢
我假设 TrafficEvent
是 Scala 案例 class。 CEP 库是为 Flink 的 Java API 编写的,因此尚不支持 Scala 案例 classes。
作为解决方法,您可以将您的案例 class 转换为普通的 Scala class。
还有一个 JIRA ticket 跟踪 CEP Scala API 的开发。
我在测试新的 Flink 1.0.0 功能时遇到了一些麻烦。我一直在修补 CEP,但我还没有设法 运行 一个简单的演示代码:
val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);
class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
pattern.get("start")
}
}
val alerts = patternStream.select(new MyPatternSelectFunction())
代码编译良好,maven 没有显示警告。 TrafficEvent 是一个 class,只有几个简单的字段,stream 是一个 class 的 Scala DataStream。当代码在 Flink 上 运行ning 时出现错误。它 运行s 一秒钟,然后代码退出并显示此错误消息:
程序已完成,但出现以下异常:
Input mismatch: Tuple type expected.
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
com.demo.DemoTraffic$.main(DemoTraffic.scala:311)
我试图通过像这样构建静态 class 来将功能移动到 Java(也许从 Scala 调用 API 时会出现一些奇怪的问题):
public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
@Override
public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
return pattern.get("start");
}
});
return rvalue;
}
但结果完全一样,在PatternStream.select行抛出同样的错误。关于我可以尝试什么或我做错了什么的任何提示?如您所见,该模式非常愚蠢,它仅用于测试目的。它只接受所有事件,并返回该事件作为响应。 Flink是1.0.0,使用Scala 2.10版本。
谢谢
我假设 TrafficEvent
是 Scala 案例 class。 CEP 库是为 Flink 的 Java API 编写的,因此尚不支持 Scala 案例 classes。
作为解决方法,您可以将您的案例 class 转换为普通的 Scala class。
还有一个 JIRA ticket 跟踪 CEP Scala API 的开发。