加入两个数据帧时 CassandraSourceRelation 不可序列化

CassandraSourceRelation not serializable when joining two dataframes

我使用 spark-cassandra-connector 1.6.2 设置了数据帧。 我尝试用 cassandra 进行一些转换。 Datastax 企业版是 5.0.5.

DataFrame df1 =  sparkContext
            .read().format("org.apache.spark.sql.cassandra")
            .options(readOptions).load()
            .where("field2 ='XX'")
            .limit(limitVal)
            .repartition(partitions);

List<String> distinctKeys = df1.getColumn("field3").collect();  

values = some transformations to get IN query values;

String cassandraQuery = String.format("SELECT * FROM "
            + "table2 "
            + "WHERE field2 = 'XX' "
            + "AND field3 IN (%s)", values);
DataFrame df2 = sparkContext.cassandraSql(cassandraQuery);

String column1 = "field3";
String column2 = "field4";
List<String> columns = new ArrayList<>();
        columns.add(column1);
        columns.add(column2);
scala.collection.Seq<String> usingColumns = 
scala.collection.JavaConverters.
collectionAsScalaIterableConverter(columns).asScala().toSeq();
DataFrame joined = df1.join(df2, usingColumns, "left_outer");

List<Row> collected = joined.collectAsList(); // doestn't work
Long count = joined.count(); // works

这是异常日志,貌似spark正在创建cassandra源码实现,无法序列化。

java.io.NotSerializableException: java.util.ArrayList$Itr
Serialization stack:
- object not serializable (class: 
org.apache.spark.sql.cassandra.CassandraSourceRelation, value:  
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496)
- field (class: org.apache.spark.sql.execution.datasources.LogicalRelation, 
name: relation, type: class org.apache.spark.sql.sources.BaseRelation)
- object (class org.apache.spark.sql.execution.datasources.LogicalRelation, 
Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496 
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Filter, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Filter, Filter 
(field2#0 = XX)
+- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Repartition, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Repartition, 
Repartition 4, true
+- Filter (field2#0 = XX)
+- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Join, name: left, 
type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Join, Join 
LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20)))
:- Repartition 4, true
:  +- Filter (field2#0 = XX)
:     +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
+- Project [fields]
+- Filter ((field2#17 = YY) && field3#18 IN (IN array))
  +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7172525e
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Project, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Project, Project 
[fields]
+- Join LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20)))
:- Repartition 4, true
:  +- Filter (field2#0 = XX)
:     +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
+- Project [fields]
  +- Filter ((field2#17 = XX) && field3#18 IN (IN array))
     +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7172525e
)
- field (class: org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun, name: 
$outer, type: class org.apache.spark.sql.catalyst.trees.TreeNode)
- object (class org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun, 
<function1>)
- field (class: 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply, 
name: $outer, type: class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun)
- object (class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$$anonfun$apply, 
<function1>)
- field (class: scala.collection.immutable.Stream$$anonfun$map, name: f, 
type: interface scala.Function1)
- object (class scala.collection.immutable.Stream$$anonfun$map, <function0>)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, 
List(org.apache.spark.OneToOneDependency@17f43f4a))
- field (class: org.apache.spark.rdd.RDD, name: 
org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[32] at 
collectAsList at RevisionPushJob.java:308)
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect, name: $outer, 
type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.rdd.RDD$$anonfun$collect, <function0>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun, name: 
$outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect)
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun, 
<function1>)

能连载吗?为什么计数操作有效但收集操作无效?

更新:

回到它之后,原来在 Java 中你首先将 Java Iterable 转换为 scala 缓冲区并从中创建一个 scala Iterable -> Seq。否则它不起作用。感谢 Russel 让我注意到问题的原因。

String attrColumn1 = "column1";
            String attrColumn2 = "column2";
            String attrColumn3 = "column3";
            String attrColumn4 = "column4";
            List<String> attrColumns = new ArrayList<>();
            attrColumns.add(attrColumn1);
            attrColumns.add(attrColumn2);
            attrColumns.add(attrColumn3);
            attrColumns.add(attrColumn4);
            Seq<String> usingAttrColumns = 
JavaConverters.asScalaBufferConverter(attrColumns).asScala().toList();

看到指向 java.util.ArrayList$Itr 的错误消息是你的不可序列化位,我认为它可能是对

的引用
 List<String> columns = new ArrayList<>();
    columns.add(column1);
    columns.add(column2);

其中的隐式转换可能需要数组列表迭代器的序列化?这是我看到的唯一 ArrayList,所以它可能是罪魁祸首。它也可能在您为 "values."

删除的代码中

当您执行 Count 时,它可以丢弃列信息,这样可能会节省您的时间,但我不能确定。

所以 TLDR 我的建议是尝试从代码中删除一些东西并替换并重新构建您的代码以找到不可序列化的位。