Spark java filter isin 方法还是其他?

Spark java filter isin method or something else?

我的 cassandra 数据库中有大约 20 亿行,我根据具有 4827 个字符串的实验列表使用 isin 方法进行过滤,如下所示。但是,我注意到在 distinct 命令之后我只有 4774 个唯一行。任何想法为什么 53 失踪? isin 方法是否有 threshold/limitations?我对实验列表进行了双重和三次检查,它确实有 4827 个字符串,并且数据库中确实存在其他 53 个字符串,因为我可以使用 cqlsh 查询它们。非常感谢任何帮助!

Dataset<Row> df1 = sp.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "mdb");
                        put("table", "experiment");
                    }
                })
                .load().select(col("experimentid")).filter(col("experimentid").isin(experimentlist.toArray()));
List<String> tmplist=df1.distinct().as(Encoders.STRING()).collectAsList();
   
System.out.println("tmplist "+tmplist.size());

关于“丢失数据”的实际问题 - 当您的集群缺少写入并且没有定期进行修复时,可能会出现问题。 Spark Cassandra Connector (SCC) 读取一致性级别 LOCAL_ONE 的数据,可能会命中没有所有数据的节点。例如,您可以尝试将一致性级别设置为 LOCAL_QUORUM(通过 --conf spark.cassandra.input.consistency.level=LOCAL_QUORUM),然后重复实验,但最好确保数据已修复。

您遇到的另一个问题是您正在使用 .isin 函数 - 它正在转换为查询 SELECT ... FROM table WHERE partition_key IN (list)。查看执行计划:

scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._
scala> val data = spark.read.cassandraFormat("m1", "test").load()
data: org.apache.spark.sql.DataFrame = [id: int, m: map<int,string>]

scala> data.filter($"id".isin(Seq(1,2,3,4):_*)).explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#169,m#170] PushedFilters: [*In(id, [1,2,3,4])], ReadSchema: struct<id:int,m:map<int,string>>

这个查询非常低效,给执行查询的节点增加了额外的负载。在 SCC 2.5.0 中,对此进行了一些优化,但最好使用也是 introduced in the SCC 2.5.0 的所谓“Direct Join”,因此 SCC 将并行执行对特定分区键的请求 - 这样更有效并减少节点的负载。您可以按如下方式使用它(唯一的区别是我将它称为“DSE Direct Join”,而在 OSS SCC 中它打印为“Cassandra Direct Join”):

scala> val toJoin = Seq(1,2,3,4).toDF("id")
toJoin: org.apache.spark.sql.DataFrame = [id: int]

scala> val joined = toJoin.join(data, data("id") === toJoin("id"))
joined: org.apache.spark.sql.DataFrame = [id: int, id: int ... 1 more field]

scala> joined.explain
== Physical Plan ==
DSE Direct Join [id = id#189] test.m1 - Reading (id, m) Pushed {}
+- LocalTableScan [id#189]

此直接连接优化需要显式启用为 described in the documentation