在 Scala 中,通过列表 [String] 过滤 Spark Cassandra RDD 的正确方法是什么?

In Scala, What is correct way to filter Spark Cassandra RDD by a List[String]?

我有一个字符串格式的 ID 列表,这个列表大约有 20,000 个 ID:

var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;

当我将此列表用于我的一个 cassandra 表时,它工作正常,无论 timelineIdsString 的大小如何:

var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
        .select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
    timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)

当我对我的另一个表执行此操作时,当列表中的 ID 超过 1000 个时,它永远不会完成:

var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
            .select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
    dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);

两个 cassandra 表都有键作为 timelineid,所以我知道它是正确的。只要 timelineids 是一个小列表,此代码就可以正常工作。

有没有更好的方法从cassandra RDD中过滤?是 IN 子句的大小导致它窒息吗?

您可以尝试将 ID 列表保留为数据框,timelineIds,并根据 timelineid 将 table 与其内部连接。然后从生成的 df.

中删除不必要的列 (timelineIds.timelineid)

与其在 Spark 级别执行连接,不如使用 Cassandra 本身执行连接 - 在这种情况下,您将仅从 Cassandra 读取必要的数据(假设连接键是分区或主键)。对于 RDD,可以使用 .joinWithCassandraTable 函数 (doc):

import com.datastax.spark.connector._

val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
val joined = toJoin.joinWithCassandraTable("test", "jtest1")
  .on(SomeColumns("pk"))

scala> joined.toDebugString
res21: String =
(8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
 |  ParallelCollectionRDD[147] at parallelize at <console>:33 []

对于 Dataframes,它被称为 direct join that is available since SCC 2.5 (see announcement) - 您需要传递一些配置才能启用它,请参阅文档:

import spark.implicits._
import org.apache.spark.sql.cassandra._

val cassdata = spark.read.cassandraFormat("jtest1", "test").load

val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))

scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#0L as int) AS id#2]
   +- *(1) Range (1, 5, step=1, splits=8)

我有很多 long & detailed blog post about joins with Cassandra - 查看更多详细信息。