无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>
Can not union two CassandraJavaRDD<CassandraRow> in Spark
由于从 Cassandra 查询数据有限制,我尝试使用 Spark 逐批读取数据并将其存储在 RDD 中。
然后我使用联合函数添加所有 RDD。
这是我的代码。
private void getDataFromCassandra(JavaSparkContext sc) {
CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
CassandraJavaRDD<CassandraRow> cassandraRDD2 = null;
While(Some Condition)
cassandraRDD = CassandraJavaUtil
.javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
.where("pid IN ('" + sb + "')");
if(cassandraRDD2==null){
cassandraRDD2=cassandraRDD;
}
else{
cassandraRDD2 = cassandraRDD2.union(cassandraRDD);
}
}
}
但是在 union 中我遇到了以下错误。
类型不匹配:无法从 JavaRDD 转换为 CassandraJavaRDD
尽管两个 RDD 的类型相似。
所以 1) 我应该将 Cast 应用为
cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);
2) 或者将其中一个RDD的Type改为JavaRDD
问题的发生是因为根据 docs:
Method: union(JavaRDD other) Return the union of this RDD and another one.
Return Value: JavaRDD
因此不匹配。
因为根据this:
public class CassandraJavaRDD<R> extends JavaRDD<R> {
...
}
CassandraJavaRDD
class 扩展了 JavaRDD
所以你可以使用:
JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;
因此 union()
方法的 return 值将匹配其类型。
由于从 Cassandra 查询数据有限制,我尝试使用 Spark 逐批读取数据并将其存储在 RDD 中。
然后我使用联合函数添加所有 RDD。
这是我的代码。
private void getDataFromCassandra(JavaSparkContext sc) {
CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
CassandraJavaRDD<CassandraRow> cassandraRDD2 = null;
While(Some Condition)
cassandraRDD = CassandraJavaUtil
.javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
.where("pid IN ('" + sb + "')");
if(cassandraRDD2==null){
cassandraRDD2=cassandraRDD;
}
else{
cassandraRDD2 = cassandraRDD2.union(cassandraRDD);
}
}
}
但是在 union 中我遇到了以下错误。
类型不匹配:无法从 JavaRDD 转换为 CassandraJavaRDD
尽管两个 RDD 的类型相似。
所以 1) 我应该将 Cast 应用为
cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);
2) 或者将其中一个RDD的Type改为JavaRDD
问题的发生是因为根据 docs:
Method: union(JavaRDD other) Return the union of this RDD and another one.
Return Value: JavaRDD
因此不匹配。
因为根据this:
public class CassandraJavaRDD<R> extends JavaRDD<R> {
...
}
CassandraJavaRDD
class 扩展了 JavaRDD
所以你可以使用:
JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;
因此 union()
方法的 return 值将匹配其类型。