使用 java spark 将数据集保存到 cassandra

saving dataset to cassandra using java spark

我正在尝试使用 java spark 将数据集保存到 cassandra db。 我能够使用以下代码成功将数据读入数据集

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

但是当我尝试编写数据集时,我得到了 IOException:无法加载或找到 table,在键空间 table 中找到了类似的

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

我正在 sparksession 中设置主机和端口 问题是我能够以覆盖和追加模式写入但无法创建 table

我使用的版本如下: 火花 java 2.0 火花卡桑德拉连接器 2.3

尝试了不同的 jar 版本,但没有任何效果 我还经历了不同的堆栈溢出和 github 链接

非常感谢任何帮助。

Spark 中的 write 操作没有自动为您创建 table 的模式 - 有多种原因。其中之一是您需要为您的 table 定义一个主键,否则,如果您设置了不正确的主键,您可能会覆盖数据。因此,Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure, but you need to provide a list of partition & clustering key columns. In Java it will look as following (full code is here):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
          CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
          partitionSeqlist, clusteringSeqlist, connector);

然后就可以照常写入数据了:

dataset.write()
   .format("org.apache.spark.sql.cassandra")
   .options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
   .save();