在 Scala 中,通过列表 [String] 过滤 Spark Cassandra RDD 的正确方法是什么?
In Scala, What is correct way to filter Spark Cassandra RDD by a List[String]?
我有一个字符串格式的 ID 列表,这个列表大约有 20,000 个 ID:
var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;
当我将此列表用于我的一个 cassandra 表时,它工作正常,无论 timelineIdsString 的大小如何:
var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
.select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)
当我对我的另一个表执行此操作时,当列表中的 ID 超过 1000 个时,它永远不会完成:
var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
.select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);
两个 cassandra 表都有键作为 timelineid,所以我知道它是正确的。只要 timelineids 是一个小列表,此代码就可以正常工作。
有没有更好的方法从cassandra RDD中过滤?是 IN 子句的大小导致它窒息吗?
您可以尝试将 ID 列表保留为数据框,timelineIds
,并根据 timelineid
将 table 与其内部连接。然后从生成的 df.
中删除不必要的列 (timelineIds.timelineid
)
与其在 Spark 级别执行连接,不如使用 Cassandra 本身执行连接 - 在这种情况下,您将仅从 Cassandra 读取必要的数据(假设连接键是分区或主键)。对于 RDD,可以使用 .joinWithCassandraTable
函数 (doc):
import com.datastax.spark.connector._
val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
val joined = toJoin.joinWithCassandraTable("test", "jtest1")
.on(SomeColumns("pk"))
scala> joined.toDebugString
res21: String =
(8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
| ParallelCollectionRDD[147] at parallelize at <console>:33 []
对于 Dataframes,它被称为 direct join that is available since SCC 2.5 (see announcement) - 您需要传递一些配置才能启用它,请参阅文档:
import spark.implicits._
import org.apache.spark.sql.cassandra._
val cassdata = spark.read.cassandraFormat("jtest1", "test").load
val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))
scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#0L as int) AS id#2]
+- *(1) Range (1, 5, step=1, splits=8)
我有很多 long & detailed blog post about joins with Cassandra - 查看更多详细信息。
我有一个字符串格式的 ID 列表,这个列表大约有 20,000 个 ID:
var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;
当我将此列表用于我的一个 cassandra 表时,它工作正常,无论 timelineIdsString 的大小如何:
var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
.select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)
当我对我的另一个表执行此操作时,当列表中的 ID 超过 1000 个时,它永远不会完成:
var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
.select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);
两个 cassandra 表都有键作为 timelineid,所以我知道它是正确的。只要 timelineids 是一个小列表,此代码就可以正常工作。
有没有更好的方法从cassandra RDD中过滤?是 IN 子句的大小导致它窒息吗?
您可以尝试将 ID 列表保留为数据框,timelineIds
,并根据 timelineid
将 table 与其内部连接。然后从生成的 df.
timelineIds.timelineid
)
与其在 Spark 级别执行连接,不如使用 Cassandra 本身执行连接 - 在这种情况下,您将仅从 Cassandra 读取必要的数据(假设连接键是分区或主键)。对于 RDD,可以使用 .joinWithCassandraTable
函数 (doc):
import com.datastax.spark.connector._
val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
val joined = toJoin.joinWithCassandraTable("test", "jtest1")
.on(SomeColumns("pk"))
scala> joined.toDebugString
res21: String =
(8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
| ParallelCollectionRDD[147] at parallelize at <console>:33 []
对于 Dataframes,它被称为 direct join that is available since SCC 2.5 (see announcement) - 您需要传递一些配置才能启用它,请参阅文档:
import spark.implicits._
import org.apache.spark.sql.cassandra._
val cassdata = spark.read.cassandraFormat("jtest1", "test").load
val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))
scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#0L as int) AS id#2]
+- *(1) Range (1, 5, step=1, splits=8)
我有很多 long & detailed blog post about joins with Cassandra - 查看更多详细信息。