无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>

Can not union two CassandraJavaRDD<CassandraRow> in Spark

由于从 Cassandra 查询数据有限制,我尝试使用 Spark 逐批读取数据并将其存储在 RDD 中。

然后我使用联合函数添加所有 RDD。

这是我的代码。

private void getDataFromCassandra(JavaSparkContext sc) {


    CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
    CassandraJavaRDD<CassandraRow> cassandraRDD2  = null;

    While(Some Condition)

     cassandraRDD = CassandraJavaUtil
                .javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
                .where("pid IN ('" + sb + "')");

    if(cassandraRDD2==null){


     cassandraRDD2=cassandraRDD;
    }
    else{
        cassandraRDD2 =  cassandraRDD2.union(cassandraRDD);
    }
}             

}

但是在 union 中我遇到了以下错误。

类型不匹配:无法从 JavaRDD 转换为 CassandraJavaRDD

尽管两个 RDD 的类型相似。

所以 1) 我应该将 Cast 应用为

 cassandraRDD2 =  (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);

2) 或者将其中一个RDD的Type改为JavaRDD

问题的发生是因为根据 docs:

Method: union(JavaRDD other) Return the union of this RDD and another one.

Return Value: JavaRDD

因此不匹配。

因为根据this:

public class CassandraJavaRDD<R> extends JavaRDD<R> {
...
}

CassandraJavaRDD class 扩展了 JavaRDD 所以你可以使用:

JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;

因此 union() 方法的 return 值将匹配其类型。