在 Spark Streaming 中加入 Kafka 和 Cassandra DataFrames 会忽略 C* 谓词下推

Joining Kafka and Cassandra DataFrames in Spark Streaming ignores C* predicate pushdown

意图

我正在通过直接流接收来自 Kafka 的数据,并希望使用来自 Cassandra 的数据丰富消息。 Kafka 消息 (Protobufs) 被解码为数据帧,然后与来自 Cassandra 的(假设是预过滤的)DF 连接。 (Kafka) 流式处理批处理大小与原始 C* 数据的关系是 [几条流式处理消息到数百万 C* 行],但连接始终只产生一个结果 [1:1] 每条消息。 join 后生成的 DF 最终存储到另一个 C* table.

问题

即使我在完整的 Cassandra 主键上加入两个 DF 并将相应的过滤器推送到 C*,似乎 Spark 在实际加入之前将整个 C* 数据集加载到内存中(我想通过使用 filter/predicate 下推来阻止)。这会导致产生大量的洗牌和任务,因此“简单”连接需要永远...

def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")      
      .set("spark.cassandra.connection.host", "xxx")
      .set("spark.cassandra.connection.keep_alive_ms", "30000")
      .setMaster("local[*]")
      
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("INFO")
    
    // Initialise Kafka
    val kafkaTopics = Set[String]("xxx")
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
      "auto.offset.reset" -> "smallest")
    
    // Kafka stream
    val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)      
    
    // Executed on the driver
    messages.foreachRDD { rdd =>
      
      // Create an instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      
      // Map MyMsg RDD
      val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
      
      // Convert RDD[MyMsg] to DataFrame
      val MyMsgDf = MyMsgRdd.toDF()        
        .select(
            $"prim1Id" as 'prim1_id,
            $"prim2Id" as 'prim2_id,
            $...
      )
      
      // Load DataFrame from C* data-source
      val base_data = base_data_df.getInstance(sqlContext)    
      
      // Left join on prim1Id and prim2Id
      val joinedDf = MyMsgDf.join(base_data,
            MyMsgDf("prim1_id") === base_data("prim1_id") &&
            MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
            .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
                && base_data("prim2_id").isin(MyMsgDf("prim2_id")))          
                
      joinedDf.show()
      joinedDf.printSchema()
      
      // Select relevant fields
            
      // Persist
    }
    
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
}

环境

解决方案

来自关于 Apache Cassandra ML 的 DataStax Spark 连接器的讨论

我学到了以下内容:

Quoting Russell Spitzer

  1. This wouldn't be a case of predicate pushdown. This is a join on a partition key column. Currently only joinWithCassandraTable supports this direct kind of join although we are working on some methods to try to have this automatically done within Spark.

  2. Dataframes can be created from any RDD which can have a schema applied to it. The easiest thing to do is probably to map your joinedRDD[x,y] to Rdd[JoinedCaseClass] and then call toDF (which will require importing your sqlContext implicits.) See the DataFrames documentation here for more info.

所以现在的实际实现类似于

// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
  "keyspace",
  "myCassandraTable",
  AllColumns,
  SomeColumns(
      "prim1_id",
      "prim2_id"
  )
).map{case (myMsg, cassandraRow) => 

  JoinedMsg(
    foo = myMsg.foo
    bar = cassandraRow.bar
  )
}

// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()

你试过 joinWithCassandraTable 吗?它应该下推到 C* 您要查找的所有键...