在 Spark Streaming 中加入 Kafka 和 Cassandra DataFrames 会忽略 C* 谓词下推
Joining Kafka and Cassandra DataFrames in Spark Streaming ignores C* predicate pushdown
意图
我正在通过直接流接收来自 Kafka 的数据,并希望使用来自 Cassandra 的数据丰富消息。 Kafka 消息 (Protobufs) 被解码为数据帧,然后与来自 Cassandra 的(假设是预过滤的)DF 连接。 (Kafka) 流式处理批处理大小与原始 C* 数据的关系是 [几条流式处理消息到数百万 C* 行],但连接始终只产生一个结果 [1:1] 每条消息。 join 后生成的 DF 最终存储到另一个 C* table.
问题
即使我在完整的 Cassandra 主键上加入两个 DF 并将相应的过滤器推送到 C*,似乎 Spark 在实际加入之前将整个 C* 数据集加载到内存中(我想通过使用 filter/predicate 下推来阻止)。这会导致产生大量的洗牌和任务,因此“简单”连接需要永远...
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Left join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
环境
- Spark 1.6
- 卡桑德拉 2.1.12
- Cassandra-Spark-连接器 1.5-RC1
- 卡夫卡 0.8.2.2
解决方案
来自关于 Apache Cassandra ML 的 DataStax Spark 连接器的讨论
- Joining Kafka and Cassandra DataFrames in Spark Streaming ignores C* predicate pushdown
- How to create a DF from CassandraJoinRDD
我学到了以下内容:
Quoting Russell Spitzer
This wouldn't be a case of predicate pushdown. This is a join on a partition key column. Currently only joinWithCassandraTable supports this direct kind of join although we are working on some methods to try to have this automatically done within Spark.
Dataframes can be created from any RDD which can have a schema applied to it. The easiest thing to do is probably to map your joinedRDD[x,y] to Rdd[JoinedCaseClass] and then call toDF (which will require importing your sqlContext implicits.) See the DataFrames documentation here for more info.
所以现在的实际实现类似于
// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
"keyspace",
"myCassandraTable",
AllColumns,
SomeColumns(
"prim1_id",
"prim2_id"
)
).map{case (myMsg, cassandraRow) =>
JoinedMsg(
foo = myMsg.foo
bar = cassandraRow.bar
)
}
// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()
你试过 joinWithCassandraTable 吗?它应该下推到 C* 您要查找的所有键...
意图
我正在通过直接流接收来自 Kafka 的数据,并希望使用来自 Cassandra 的数据丰富消息。 Kafka 消息 (Protobufs) 被解码为数据帧,然后与来自 Cassandra 的(假设是预过滤的)DF 连接。 (Kafka) 流式处理批处理大小与原始 C* 数据的关系是 [几条流式处理消息到数百万 C* 行],但连接始终只产生一个结果 [1:1] 每条消息。 join 后生成的 DF 最终存储到另一个 C* table.
问题
即使我在完整的 Cassandra 主键上加入两个 DF 并将相应的过滤器推送到 C*,似乎 Spark 在实际加入之前将整个 C* 数据集加载到内存中(我想通过使用 filter/predicate 下推来阻止)。这会导致产生大量的洗牌和任务,因此“简单”连接需要永远...
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Left join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
环境
- Spark 1.6
- 卡桑德拉 2.1.12
- Cassandra-Spark-连接器 1.5-RC1
- 卡夫卡 0.8.2.2
解决方案
来自关于 Apache Cassandra ML 的 DataStax Spark 连接器的讨论
- Joining Kafka and Cassandra DataFrames in Spark Streaming ignores C* predicate pushdown
- How to create a DF from CassandraJoinRDD
我学到了以下内容:
Quoting Russell Spitzer
This wouldn't be a case of predicate pushdown. This is a join on a partition key column. Currently only joinWithCassandraTable supports this direct kind of join although we are working on some methods to try to have this automatically done within Spark.
Dataframes can be created from any RDD which can have a schema applied to it. The easiest thing to do is probably to map your joinedRDD[x,y] to Rdd[JoinedCaseClass] and then call toDF (which will require importing your sqlContext implicits.) See the DataFrames documentation here for more info.
所以现在的实际实现类似于
// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
"keyspace",
"myCassandraTable",
AllColumns,
SomeColumns(
"prim1_id",
"prim2_id"
)
).map{case (myMsg, cassandraRow) =>
JoinedMsg(
foo = myMsg.foo
bar = cassandraRow.bar
)
}
// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()
你试过 joinWithCassandraTable 吗?它应该下推到 C* 您要查找的所有键...