在 spark 中保存到 cassandra,并行化方法在 java 中不可用
Save to cassandra in spark, parallelize method is not availble in java
我正在尝试使用 java 中的 spark 仅将一行保存到 cassandra table(这是在 spark 中进行长时间处理后的结果),我正在使用新方法进行连接使用 spark session 到 cassandra 如下:
SparkSession spark = SparkSession
.builder()
.appName("App")
.config("spark.cassandra.connection.host", "cassandra1.example.com")
.config("spark.cassandra.connection.port", "9042")
.master("spark://cassandra.example.com:7077")
.getOrCreate();
连接成功并且运行良好,因为我将 Spark 安装在与 cassandra 相同的节点上,从 cassandra 读取一些 RDD 后我想保存到 cassandra 中的另一个 table,所以我正在关注文档here,即保存到cassandra的部分如下:
List<Person> people = Arrays.asList(
new Person(1, "John", new Date()),
new Person(2, "Troy", new Date()),
new Person(3, "Andrew", new Date())
);
JavaRDD<Person> rdd = spark.sparkContext().parallelize(people);
javaFunctions(rdd).writerBuilder("ks", "people", mapToRow(Person.class)).saveToCassandra();
我面临的问题是不接受并行化方法,并且只有一个 scala 版本可用,错误是:
The method parallelize(Seq<T>, int, ClassTag<T>) in the type
SparkContext is not applicable for the arguments (List<Person>)
我如何在 Java 中使用它来保存到 cassandra table?
到parallelize
java.util.List
你可以使用JavaSparkContext
(不是SparkContext
),像这样:
import org.apache.spark.api.java.JavaSparkContext;
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.parallelize(people);
我正在尝试使用 java 中的 spark 仅将一行保存到 cassandra table(这是在 spark 中进行长时间处理后的结果),我正在使用新方法进行连接使用 spark session 到 cassandra 如下:
SparkSession spark = SparkSession
.builder()
.appName("App")
.config("spark.cassandra.connection.host", "cassandra1.example.com")
.config("spark.cassandra.connection.port", "9042")
.master("spark://cassandra.example.com:7077")
.getOrCreate();
连接成功并且运行良好,因为我将 Spark 安装在与 cassandra 相同的节点上,从 cassandra 读取一些 RDD 后我想保存到 cassandra 中的另一个 table,所以我正在关注文档here,即保存到cassandra的部分如下:
List<Person> people = Arrays.asList(
new Person(1, "John", new Date()),
new Person(2, "Troy", new Date()),
new Person(3, "Andrew", new Date())
);
JavaRDD<Person> rdd = spark.sparkContext().parallelize(people);
javaFunctions(rdd).writerBuilder("ks", "people", mapToRow(Person.class)).saveToCassandra();
我面临的问题是不接受并行化方法,并且只有一个 scala 版本可用,错误是:
The method parallelize(Seq<T>, int, ClassTag<T>) in the type
SparkContext is not applicable for the arguments (List<Person>)
我如何在 Java 中使用它来保存到 cassandra table?
到parallelize
java.util.List
你可以使用JavaSparkContext
(不是SparkContext
),像这样:
import org.apache.spark.api.java.JavaSparkContext;
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.parallelize(people);