Spark Streaming 过滤流数据
Spark Streaming Filtering the Streaming data
我正在尝试过滤流式数据,并且基于 id 列的值我想将数据保存到不同的 tables
我有两个 table
- testTable_odd (id,data1,data2)
- testTable_even (id,data1)
如果 id 值是奇数,那么我想将记录保存到 testTable_odd table,如果值是偶数,那么我想将记录保存到 testTable_even.
这里棘手的部分是我的两个 table 有不同的列。尝试了多种方法,考虑了 return 类型 Either[obj1,obj2] 的 Scala 函数,但我无法成功,任何指针将不胜感激。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.foreachRDD(rdd => if (!rdd.isEmpty) rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data")))
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data1: String, data2: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
我认为你只是想使用过滤功能两次。你可以做类似
val evenstream = stream.map { case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.filter{ k =>
k._1 % 2 == 0
}
evenstream.foreachRDD{rdd=>
//Do something with even stream
}
val oddstream = stream.map { case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.filter{ k =>
k._1 % 2 == 1
}
oddstream.foreachRDD{rdd=>
//Do something with odd stream
}
当我在一个项目上做类似的事情时 here 如果你在第 191 行附近往下看,我使用了两次过滤函数。在那里,我根据 0 到 1 之间的值对元组进行分类和保存, 请随时查看。
我已经执行了以下步骤。
1) 从原始 JSON 字符串和大小写 class 中提取详细信息
2) 创建了 super JSON (其中包含两个过滤条件所需的详细信息)
3) 将 JSON 转换成 DataFrame
4) 在 JSON 上执行 select 和 where 子句
5) 保存到 Cassandra
我正在尝试过滤流式数据,并且基于 id 列的值我想将数据保存到不同的 tables
我有两个 table
- testTable_odd (id,data1,data2)
- testTable_even (id,data1)
如果 id 值是奇数,那么我想将记录保存到 testTable_odd table,如果值是偶数,那么我想将记录保存到 testTable_even.
这里棘手的部分是我的两个 table 有不同的列。尝试了多种方法,考虑了 return 类型 Either[obj1,obj2] 的 Scala 函数,但我无法成功,任何指针将不胜感激。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.foreachRDD(rdd => if (!rdd.isEmpty) rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data")))
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data1: String, data2: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
我认为你只是想使用过滤功能两次。你可以做类似
val evenstream = stream.map { case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.filter{ k =>
k._1 % 2 == 0
}
evenstream.foreachRDD{rdd=>
//Do something with even stream
}
val oddstream = stream.map { case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.filter{ k =>
k._1 % 2 == 1
}
oddstream.foreachRDD{rdd=>
//Do something with odd stream
}
当我在一个项目上做类似的事情时 here 如果你在第 191 行附近往下看,我使用了两次过滤函数。在那里,我根据 0 到 1 之间的值对元组进行分类和保存, 请随时查看。
我已经执行了以下步骤。 1) 从原始 JSON 字符串和大小写 class 中提取详细信息 2) 创建了 super JSON (其中包含两个过滤条件所需的详细信息) 3) 将 JSON 转换成 DataFrame 4) 在 JSON 上执行 select 和 where 子句 5) 保存到 Cassandra