使用 Spark Streaming 从 Cassandra 读取数据
Reading from Cassandra using Spark Streaming
我在使用 spark streaming 从 Cassandra 读取时遇到问题。
如上link,我用
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
到select来自cassandra的数据,但似乎spark streaming只有一次查询,但我希望它继续使用10秒的间隔进行查询。
我的代码如下,希望得到您的回复。
谢谢!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}
您可以创建一个以 CassandraRDD 作为输入的 ConstantInputDStream。 ConstantInputDStream 将在每个流式传输间隔提供相同的 RDD,并且通过对该 RDD 执行操作,您将触发 RDD 谱系的具体化,从而导致每次都在 Cassandra 上执行查询。
确保查询的数据不会无限增长,以免增加查询次数,导致流式处理不稳定。
这样的事情应该可以解决问题(使用您的代码作为起点):
import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd =>
// any action will trigger the underlying cassandra query, using collect to have a simple output
println(rdd.collect.mkString("\n"))
}
ssc.start()
ssc.awaitTermination()
我遇到了同样的问题,并通过创建 InputDStream class 的子 class 找到了解决方案。有必要定义 start()
和 compute()
方法。
start()
可以用来做准备。主要逻辑在compute()
。它应 return Option[RDD[T]]
。
为了使 class 灵活,定义了 InputStreamQuery
特性。
trait InputStreamQuery[T] {
// where clause condition for partition key
def partitionCond : (String, Any)
// function to return next partition key
def nextValue(v:Any) : Option[Any]
// where clause condition for clustering key
def whereCond : (String, (T) => Any)
// batch size
def batchSize : Int
}
对于 Cassandra table keyspace.test
,创建 test_by_date
通过分区键 date
重组 table。
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM keyspace.test
WHERE id IS NOT NULL
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY ( id ASC );
test
table 的一个可能实现应该是
class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
val dateFormat = "uuuu-MM-dd"
// set batch size as 10 records
override def batchSize: Int = 10
// partitioning key conditions, query string and initial value
override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
// clustering key condition, query string and function to get clustering key from the instance
override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
// return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
override def nextValue(v: Any): Option[Any] = {
import java.time.format.DateTimeFormatter
val formatter = DateTimeFormatter.ofPattern( dateFormat)
val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
if ( nextDate.isAfter( LocalDate.now()) ) None
else Some( nextDate.format(formatter))
}
}
可以在CassandraInputStream
class中使用,如下所示。
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T])
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
// find a partition key which stores some records
def findStartValue(cql : String, value:Any): Any = {
val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
if (rdd.cassandraCount() > 0 ) value
else {
nextValue(value).map( findStartValue( cql, _)).getOrElse( value)
}
}
// get query string and initial value from partitionCond method
val (cql, value) = partitionCond
partitionKey = findStartValue(cql, value)
}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val (cql, _) = partitionCond
val (wh, whKey) = whereCond
def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = {
// query with partitioning condition
val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey)
val rdd = lastElm.map{ x =>
query.where( wh, whKey(x)).withAscOrder.limit(batchSize)
}.getOrElse( query.withAscOrder.limit(batchSize))
if ( rdd.cassandraCount() > 0 ) {
// store the last element of this RDD
lastElm = Some(rdd.collect.last)
Some(rdd)
}
else {
// find the next partition key which stores data
nextValue(patKey).flatMap{ k =>
partitionKey = k
fetchNext(k)}
}
}
fetchNext( partitionKey)
}
}
结合所有 classes,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra( ... )
ssc.start()
ssc.awaitTermination()
我在使用 spark streaming 从 Cassandra 读取时遇到问题。
如上link,我用
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
到select来自cassandra的数据,但似乎spark streaming只有一次查询,但我希望它继续使用10秒的间隔进行查询。
我的代码如下,希望得到您的回复。
谢谢!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}
您可以创建一个以 CassandraRDD 作为输入的 ConstantInputDStream。 ConstantInputDStream 将在每个流式传输间隔提供相同的 RDD,并且通过对该 RDD 执行操作,您将触发 RDD 谱系的具体化,从而导致每次都在 Cassandra 上执行查询。
确保查询的数据不会无限增长,以免增加查询次数,导致流式处理不稳定。
这样的事情应该可以解决问题(使用您的代码作为起点):
import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd =>
// any action will trigger the underlying cassandra query, using collect to have a simple output
println(rdd.collect.mkString("\n"))
}
ssc.start()
ssc.awaitTermination()
我遇到了同样的问题,并通过创建 InputDStream class 的子 class 找到了解决方案。有必要定义 start()
和 compute()
方法。
start()
可以用来做准备。主要逻辑在compute()
。它应 return Option[RDD[T]]
。
为了使 class 灵活,定义了 InputStreamQuery
特性。
trait InputStreamQuery[T] {
// where clause condition for partition key
def partitionCond : (String, Any)
// function to return next partition key
def nextValue(v:Any) : Option[Any]
// where clause condition for clustering key
def whereCond : (String, (T) => Any)
// batch size
def batchSize : Int
}
对于 Cassandra table keyspace.test
,创建 test_by_date
通过分区键 date
重组 table。
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM keyspace.test
WHERE id IS NOT NULL
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY ( id ASC );
test
table 的一个可能实现应该是
class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
val dateFormat = "uuuu-MM-dd"
// set batch size as 10 records
override def batchSize: Int = 10
// partitioning key conditions, query string and initial value
override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
// clustering key condition, query string and function to get clustering key from the instance
override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
// return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
override def nextValue(v: Any): Option[Any] = {
import java.time.format.DateTimeFormatter
val formatter = DateTimeFormatter.ofPattern( dateFormat)
val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
if ( nextDate.isAfter( LocalDate.now()) ) None
else Some( nextDate.format(formatter))
}
}
可以在CassandraInputStream
class中使用,如下所示。
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T])
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
// find a partition key which stores some records
def findStartValue(cql : String, value:Any): Any = {
val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
if (rdd.cassandraCount() > 0 ) value
else {
nextValue(value).map( findStartValue( cql, _)).getOrElse( value)
}
}
// get query string and initial value from partitionCond method
val (cql, value) = partitionCond
partitionKey = findStartValue(cql, value)
}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val (cql, _) = partitionCond
val (wh, whKey) = whereCond
def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = {
// query with partitioning condition
val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey)
val rdd = lastElm.map{ x =>
query.where( wh, whKey(x)).withAscOrder.limit(batchSize)
}.getOrElse( query.withAscOrder.limit(batchSize))
if ( rdd.cassandraCount() > 0 ) {
// store the last element of this RDD
lastElm = Some(rdd.collect.last)
Some(rdd)
}
else {
// find the next partition key which stores data
nextValue(patKey).flatMap{ k =>
partitionKey = k
fetchNext(k)}
}
}
fetchNext( partitionKey)
}
}
结合所有 classes,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra( ... )
ssc.start()
ssc.awaitTermination()