IntelliJ 警告 Flink CEP 未知错误 IDE
Flink CEP unknown error alerted by IntelliJ IDE
我开始用 Scala 语言研究 Apache Flink 的 CEP 库,当我尝试通过执行 CEP.pattern(input,pattern)
创建 PatternStream 时,如 https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html 的教程中所示,IDE说它 "Cannot resolve overloaded method",指的是 pattern
方法。根据readTextFile
和Pattern[String].begin('line').where(_.length == 10)
的实现,这两个都是我分别用来创建输入和模式的,方法的参数或泛型类型应该没有任何问题。
这是我写的代码。我知道它还没有完成,但因为出现了这个问题,我还是无法完成它。
package FlinkCEPClasses
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
class FlinkCEPPipeline {
var props : Properties = new Properties()
var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines")
var patt : Pattern[String,String] = Pattern.begin[String]("igual").where(_.length == 10)
// Problem appears at the following line. A red subscript appears at the pattern method,
// saying the following: "Cannot resolve overloaded method"
var CEPstream = CEP.pattern(input,patt)
input.writeAsText("/home/luca/Desktop/flinkcepout",FileSystem.WriteMode.OVERWRITE)
env.execute()
这是我的 build.sbt 文件内容:
name := "FlinkCEP"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test```
我使用这段代码的目的只是为了将它 运行 视为一个简单的 "where" 条件,除此之外它应该没有任何更大的实用性。我将 IntelliJ 用作 IDE。另外,我不确定是否可以使用 Scala 的 CEP 库。如果有人能对此有所了解,我将不胜感激。
试试这个:
import org.apache.flink.cep.scala.PatternStream
...
val CEPstream: PatternStream[String] = CEP.pattern[String](input, patt)
有关将 CEP 与 Scala 结合使用的简单示例,请参阅 github。
嗯,看了@DavidAnderson 的 github 例子,我终于解决了这个问题。由于我做了很多更改,我不能确定我的解决方案是否适合您,但我从 import org.apache.flink.streaming.api.datastream.DataStream
更改为 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}
。注意不明确的导入并确保您导入的是真正需要的 类.
我将列出我所有的导入和我的 build.sbt 文件,以便您可以完全访问我的配置。
进口
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}
Build.sbt
name := "FlinkCEP"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
//libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep
libraryDependencies += "org.apache.flink" %% "flink-cep" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-runtime
libraryDependencies += "org.apache.flink" %% "flink-runtime" % "1.9.0" % Test
// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test
我开始用 Scala 语言研究 Apache Flink 的 CEP 库,当我尝试通过执行 CEP.pattern(input,pattern)
创建 PatternStream 时,如 https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html 的教程中所示,IDE说它 "Cannot resolve overloaded method",指的是 pattern
方法。根据readTextFile
和Pattern[String].begin('line').where(_.length == 10)
的实现,这两个都是我分别用来创建输入和模式的,方法的参数或泛型类型应该没有任何问题。
这是我写的代码。我知道它还没有完成,但因为出现了这个问题,我还是无法完成它。
package FlinkCEPClasses
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
class FlinkCEPPipeline {
var props : Properties = new Properties()
var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines")
var patt : Pattern[String,String] = Pattern.begin[String]("igual").where(_.length == 10)
// Problem appears at the following line. A red subscript appears at the pattern method,
// saying the following: "Cannot resolve overloaded method"
var CEPstream = CEP.pattern(input,patt)
input.writeAsText("/home/luca/Desktop/flinkcepout",FileSystem.WriteMode.OVERWRITE)
env.execute()
这是我的 build.sbt 文件内容:
name := "FlinkCEP"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test```
我使用这段代码的目的只是为了将它 运行 视为一个简单的 "where" 条件,除此之外它应该没有任何更大的实用性。我将 IntelliJ 用作 IDE。另外,我不确定是否可以使用 Scala 的 CEP 库。如果有人能对此有所了解,我将不胜感激。
试试这个:
import org.apache.flink.cep.scala.PatternStream
...
val CEPstream: PatternStream[String] = CEP.pattern[String](input, patt)
有关将 CEP 与 Scala 结合使用的简单示例,请参阅 github。
嗯,看了@DavidAnderson 的 github 例子,我终于解决了这个问题。由于我做了很多更改,我不能确定我的解决方案是否适合您,但我从 import org.apache.flink.streaming.api.datastream.DataStream
更改为 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}
。注意不明确的导入并确保您导入的是真正需要的 类.
我将列出我所有的导入和我的 build.sbt 文件,以便您可以完全访问我的配置。
进口
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}
Build.sbt
name := "FlinkCEP"
version := "0.1"
scalaVersion := "2.12.10"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
//libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep
libraryDependencies += "org.apache.flink" %% "flink-cep" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-runtime
libraryDependencies += "org.apache.flink" %% "flink-runtime" % "1.9.0" % Test
// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test