通过火花流读取cassandra table时如何在"Where"子句中设置变量?
How to set variables in "Where" clause when reading cassandra table by spark streaming?
我正在使用 Spark Streaming 和 Cassandra 做一些统计。当通过 spark-cassandra-connector 读取 cassandra tables 并通过 ConstantInputDStream 将 cassandra 行 RDD 转换为 DStreamRDD 时,where 子句中的 "CurrentDate" 变量仍然保持在程序启动的同一天。
目的是按某些维度分析截至当前日期的总分,但现在代码 运行s 分析只到它开始 运行ning 的那一天。我运行2019-05-25的代码和之后插入table的数据都收不进去了
我使用的代码如下:
class TestJob extends Serializable {
def test(ssc : StreamingContext) : Unit={
val readTableRdd = ssc.cassandraTable(Configurations.getInstance().keySpace1,Constants.testTable)
.select(
"code",
"date",
"time",
"score"
).where("date<= ?",new Utils().getCurrentDate())
val DStreamRdd = new ConstantInputDStream(ssc,readTableRdd)
DStreamRdd.foreachRDD{r=>
//DO SOMETHING
}
}
}
object GetSSC extends Serializable {
def getSSC() : StreamingContext ={
val conf = new SparkConf()
.setMaster(Configurations.getInstance().sparkHost)
.setAppName(Configurations.getInstance().appName)
.set("spark.cassandra.connection.host", Configurations.getInstance().casHost)
.set("spark.cleaner.ttl", "3600")
.set("spark.default.parallelism","3")
.set("spark.ui.port","5050")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
@transient lazy val ssc = new StreamingContext(sc,Seconds(30))
ssc
}
}
object Main {
val logger : Log = LogFactory.getLog(Main.getClass)
def main(args : Array[String]) : Unit={
val ssc = GetSSC.getSSC()
try{
new TestJob().test(ssc)
ssc.start()
ssc.awaitTermination()
}catch {
case e : Exception =>
logger.error(Main.getClass.getSimpleName+"error :
"+e.printStackTrace())
}
}
}
Table 在此演示中使用如下:
CREATE TABLE test.test_table (
code text PRIMARY KEY, //UUID
date text, // '20190520'
time text, // '12:00:00'
score int); // 90
感谢任何帮助!
一般来说,Spark Cassandra Connector 返回的 RDD 不是流式 RDD - Cassandra 中没有这样的功能,可以订阅更改提要并对其进行分析。您可以通过显式循环和获取数据来实现类似的功能,但这需要仔细设计表,但如果不更深入地研究延迟、数据量等要求,就很难说些什么。
我正在使用 Spark Streaming 和 Cassandra 做一些统计。当通过 spark-cassandra-connector 读取 cassandra tables 并通过 ConstantInputDStream 将 cassandra 行 RDD 转换为 DStreamRDD 时,where 子句中的 "CurrentDate" 变量仍然保持在程序启动的同一天。
目的是按某些维度分析截至当前日期的总分,但现在代码 运行s 分析只到它开始 运行ning 的那一天。我运行2019-05-25的代码和之后插入table的数据都收不进去了
我使用的代码如下:
class TestJob extends Serializable {
def test(ssc : StreamingContext) : Unit={
val readTableRdd = ssc.cassandraTable(Configurations.getInstance().keySpace1,Constants.testTable)
.select(
"code",
"date",
"time",
"score"
).where("date<= ?",new Utils().getCurrentDate())
val DStreamRdd = new ConstantInputDStream(ssc,readTableRdd)
DStreamRdd.foreachRDD{r=>
//DO SOMETHING
}
}
}
object GetSSC extends Serializable {
def getSSC() : StreamingContext ={
val conf = new SparkConf()
.setMaster(Configurations.getInstance().sparkHost)
.setAppName(Configurations.getInstance().appName)
.set("spark.cassandra.connection.host", Configurations.getInstance().casHost)
.set("spark.cleaner.ttl", "3600")
.set("spark.default.parallelism","3")
.set("spark.ui.port","5050")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
@transient lazy val ssc = new StreamingContext(sc,Seconds(30))
ssc
}
}
object Main {
val logger : Log = LogFactory.getLog(Main.getClass)
def main(args : Array[String]) : Unit={
val ssc = GetSSC.getSSC()
try{
new TestJob().test(ssc)
ssc.start()
ssc.awaitTermination()
}catch {
case e : Exception =>
logger.error(Main.getClass.getSimpleName+"error :
"+e.printStackTrace())
}
}
}
Table 在此演示中使用如下:
CREATE TABLE test.test_table (
code text PRIMARY KEY, //UUID
date text, // '20190520'
time text, // '12:00:00'
score int); // 90
感谢任何帮助!
一般来说,Spark Cassandra Connector 返回的 RDD 不是流式 RDD - Cassandra 中没有这样的功能,可以订阅更改提要并对其进行分析。您可以通过显式循环和获取数据来实现类似的功能,但这需要仔细设计表,但如果不更深入地研究延迟、数据量等要求,就很难说些什么。