如何通过更改架构和添加其他属性将 DataFrame 从 Spark 保存到 Cassandra table

How to save a DataFrame from Spark to Cassandra table by changing the schema and adding additional properties

我已经使用 Spark SQL 从 Cassandra 数据库中检索数据:

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
                "WHERE CAST(store_id as string) = '" + storeId + "'");

之后我做了一些过滤,我想将这些数据保存到另一个 Cassandra table 中,如下所示:

store_id uuid,
report_name text,
report_time timestamp,
sharder int,
customer_email text,
count int static,
firts_name text,
last_name text,
PRIMARY KEY ((store_id, report_name, report_time, sharder), customer_email)

当我将 DataFrame 保存到新的 table 时,如何添加这些附加属性?另外,使用此示例对 Cassandra 长行进行分片的最佳做法是什么?我希望在 DataFrame 中有 4k-6k 条记录,所以必须对长行进行分片,但我不确定是否计算记录然后更改 sharder 一定数量的项目是Spark 或 Cassandra 中的最佳实践。

您将需要执行某种 transformation(例如 map())以将属性添加到数据框。

拥有 DataFrame 后,您可以定义一个案例 class,它具有新架构的结构和添加的属性。

您可以像这样创建案例 class:case class DataFrameRecord(property1: String, property2: Long, property3: String, property4: Double)

然后就可以用map转换成使用caseclass的新结构了:df.rdd.map(p => DataFrameRecord(prop1, prop2, prop3, prop4)).toDF()