将 Spark 数据集 [(String, Map[String, String])] 插入到 Cassandra Table
Insert Spark Dataset[(String, Map[String, String])] to Cassandra Table
我有一个类型为 Dataset[(String, Map[String, String])] 的 Spark Dataset。
我必须将其插入 Cassandra table。
在这里,Dataset[(String, Map[String, String])] 中的键将成为我在 Cassandra 中的行的主键。
Dataset[(String, Map[String, String])] 中的 Map 将在列的同一行中 ColumnNameValueMap.
数据集可以有数百万行。
我也想以最佳方式进行(例如批量插入等)
我的 Cassandra table 结构是:
CREATE TABLE SampleKeyspace.CassandraTable (
RowKey text PRIMARY KEY,
ColumnNameValueMap map<text,text>
);
请建议如何做同样的事情。
你所需要的只是使用Spark Cassandra Connector(最好使用刚刚发布的2.5.0版本)。它为数据集提供读写功能,所以在你的情况下它只是
import org.apache.spark.sql.cassandra._
your_data.write.cassandraFormat("CassandraTable", "SampleKeyspace").mode("append").save()
如果您的 table 还不存在,那么 you can create it base don the structure of the dataset itself - 有 2 个函数:createCassandraTable
和 createCassandraTableEx
- 最好使用第二个,因为它提供对 table 创建的更多控制。
P.S。您可以在以下 blog post.
中找到有关 2.5.0 版本的更多信息
我有一个类型为 Dataset[(String, Map[String, String])] 的 Spark Dataset。
我必须将其插入 Cassandra table。
在这里,Dataset[(String, Map[String, String])] 中的键将成为我在 Cassandra 中的行的主键。
Dataset[(String, Map[String, String])] 中的 Map 将在列的同一行中 ColumnNameValueMap.
数据集可以有数百万行。
我也想以最佳方式进行(例如批量插入等)
我的 Cassandra table 结构是:
CREATE TABLE SampleKeyspace.CassandraTable (
RowKey text PRIMARY KEY,
ColumnNameValueMap map<text,text>
);
请建议如何做同样的事情。
你所需要的只是使用Spark Cassandra Connector(最好使用刚刚发布的2.5.0版本)。它为数据集提供读写功能,所以在你的情况下它只是
import org.apache.spark.sql.cassandra._
your_data.write.cassandraFormat("CassandraTable", "SampleKeyspace").mode("append").save()
如果您的 table 还不存在,那么 you can create it base don the structure of the dataset itself - 有 2 个函数:createCassandraTable
和 createCassandraTableEx
- 最好使用第二个,因为它提供对 table 创建的更多控制。
P.S。您可以在以下 blog post.
中找到有关 2.5.0 版本的更多信息