使用 Apache Flink 从 Kafka 主题消费然后使用 Flink CEP 处理流
Using Apache Flink to consume from a Kafka topic then processing the stream with Flink CEP
在这个项目中,我尝试使用 Flink 从 Kafka 主题中获取数据,然后使用 Flink CEP 处理流以检测模式。
使用 Kafka connect 的部分有效并且正在获取数据,但 CEP 部分由于某种原因不起作用。
我在这个项目中使用 scala。
build.sbt:
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.12.2"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.12.2"
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.12.2"
主要代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import java.util
import java.util.Properties
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.pattern.conditions.IterativeCondition
object flinkExample {
def main(args: Array[String]): Unit = {
val CLOSE_THRESHOLD: Double = 140.00
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
consumer.setStartFromEarliest
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val src: DataStream[String] = see.addSource(consumer)
val keyedStream: DataStream[Stock] = src.map(v => v)
.map {
v =>
val data = v.split(":")
val date = data(0)
val close = data(1).toDouble
Stock(date,close)
}
val pat = Pattern
.begin[Stock]("start")
.where(_.Adj_Close > CLOSE_THRESHOLD)
val patternStream = CEP.pattern(keyedStream, pat)
val result = patternStream.select(
patternSelectFunction = new PatternSelectFunction[Stock, String]() {
override def select(pattern: util.Map[String, util.List[Stock]]): String = {
val data = pattern.get("first").get(0)
data.toString
}
}
)
result.print()
see.execute("ASK Flink Kafka")
}
case class Stock(date: String,
Adj_Close: Double)
{
override def toString: String = s"Stock date: $date, Adj Close: $Adj_Close"
}
}
来自 Kafka 的数据采用字符串格式:“date:value”
Scala 版本:2.11.12
弗林克版本:1.12.2
卡夫卡版本:2.3.0
我正在使用 sbt assembly 构建项目,然后在 flink 仪表板中部署 jar。
在 pattern.get("first")
中,您从模式序列中选择名为“first”的模式,但模式序列只有一个模式,名为“start”。尝试将“first”改为“start”。
此外,CEP 必须能够按时间顺序对流进行排序,以便进行模式匹配。您应该定义水印策略。对于处理时间语义,您可以使用 WatermarkStrategy.noWatermarks()
.
在这个项目中,我尝试使用 Flink 从 Kafka 主题中获取数据,然后使用 Flink CEP 处理流以检测模式。 使用 Kafka connect 的部分有效并且正在获取数据,但 CEP 部分由于某种原因不起作用。 我在这个项目中使用 scala。
build.sbt:
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.12.2"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.12.2"
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.12.2"
主要代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import java.util
import java.util.Properties
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.pattern.conditions.IterativeCondition
object flinkExample {
def main(args: Array[String]): Unit = {
val CLOSE_THRESHOLD: Double = 140.00
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
consumer.setStartFromEarliest
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val src: DataStream[String] = see.addSource(consumer)
val keyedStream: DataStream[Stock] = src.map(v => v)
.map {
v =>
val data = v.split(":")
val date = data(0)
val close = data(1).toDouble
Stock(date,close)
}
val pat = Pattern
.begin[Stock]("start")
.where(_.Adj_Close > CLOSE_THRESHOLD)
val patternStream = CEP.pattern(keyedStream, pat)
val result = patternStream.select(
patternSelectFunction = new PatternSelectFunction[Stock, String]() {
override def select(pattern: util.Map[String, util.List[Stock]]): String = {
val data = pattern.get("first").get(0)
data.toString
}
}
)
result.print()
see.execute("ASK Flink Kafka")
}
case class Stock(date: String,
Adj_Close: Double)
{
override def toString: String = s"Stock date: $date, Adj Close: $Adj_Close"
}
}
来自 Kafka 的数据采用字符串格式:“date:value”
Scala 版本:2.11.12 弗林克版本:1.12.2 卡夫卡版本:2.3.0
我正在使用 sbt assembly 构建项目,然后在 flink 仪表板中部署 jar。
在 pattern.get("first")
中,您从模式序列中选择名为“first”的模式,但模式序列只有一个模式,名为“start”。尝试将“first”改为“start”。
此外,CEP 必须能够按时间顺序对流进行排序,以便进行模式匹配。您应该定义水印策略。对于处理时间语义,您可以使用 WatermarkStrategy.noWatermarks()
.