如何使用 Spark DataFrames 和 Cassandra 设置命名策略
How to set naming strategy using Spark DataFrames and Cassandra
我有案例 类 使用 lowerCamelCase 变量名称约定。
Cassandra 对行和名称使用 lower_underscore 约定。
有没有办法让DataFrame自动转换名字?
Table 定义:
CREATE TABLE data_storage.dummy (
some_name text,
some_value text,
PRIMARY KEY (some_name)
);
Spark代码:
val ds = List(Dummy("key", 1)).toDS()
ds.write
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "data_storage")
.option("table", "dummy")
.save
我编写了一个函数,将名称从驼峰更改为下划线,反之亦然,但是在 some 遗留案例 类 的情况下它不起作用使用 lower_underscore 命名约定。
private def toUnderscore(ds: DataFrame): DataFrame = {
normalizeNames(ds, CaseFormat.LOWER_CAMEL, CaseFormat.LOWER_UNDERSCORE)
}
private def toCamel(ds: DataFrame): DataFrame =
normalizeNames(ds, CaseFormat.LOWER_UNDERSCORE, CaseFormat.LOWER_CAMEL)
private def normalizeNames(df: DataFrame, from: CaseFormat, to: CaseFormat): DataFrame = {
def normalizeName(c: String): String = from.to(to, c.toString)
def flattenSchema(schema: StructType): StructType = {
StructType(schema.fields.map {
case StructField(name, inner: StructType, nullable, metadata) =>
StructField(normalizeName(name), flattenSchema(inner), nullable, metadata)
case StructField(name, array: ArrayType, nullable, metadata) =>
StructField(
normalizeName(name), array.copy(elementType = array.elementType match {
case s: StructType => flattenSchema(s)
case x => x
}), nullable, metadata)
case StructField(name, flat, nullable, metadata) =>
StructField(normalizeName(name), flat, nullable, metadata)
})
}
没有用于这种名称转换的内置方法。 RDD 接口有一些额外的方法围绕它的类型转换来进行这些类型的转换,但通常我们没有为 DataSets/DataFrames.
添加机制
我有案例 类 使用 lowerCamelCase 变量名称约定。 Cassandra 对行和名称使用 lower_underscore 约定。 有没有办法让DataFrame自动转换名字?
Table 定义:
CREATE TABLE data_storage.dummy (
some_name text,
some_value text,
PRIMARY KEY (some_name)
);
Spark代码:
val ds = List(Dummy("key", 1)).toDS()
ds.write
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "data_storage")
.option("table", "dummy")
.save
我编写了一个函数,将名称从驼峰更改为下划线,反之亦然,但是在 some 遗留案例 类 的情况下它不起作用使用 lower_underscore 命名约定。
private def toUnderscore(ds: DataFrame): DataFrame = {
normalizeNames(ds, CaseFormat.LOWER_CAMEL, CaseFormat.LOWER_UNDERSCORE)
}
private def toCamel(ds: DataFrame): DataFrame =
normalizeNames(ds, CaseFormat.LOWER_UNDERSCORE, CaseFormat.LOWER_CAMEL)
private def normalizeNames(df: DataFrame, from: CaseFormat, to: CaseFormat): DataFrame = {
def normalizeName(c: String): String = from.to(to, c.toString)
def flattenSchema(schema: StructType): StructType = {
StructType(schema.fields.map {
case StructField(name, inner: StructType, nullable, metadata) =>
StructField(normalizeName(name), flattenSchema(inner), nullable, metadata)
case StructField(name, array: ArrayType, nullable, metadata) =>
StructField(
normalizeName(name), array.copy(elementType = array.elementType match {
case s: StructType => flattenSchema(s)
case x => x
}), nullable, metadata)
case StructField(name, flat, nullable, metadata) =>
StructField(normalizeName(name), flat, nullable, metadata)
})
}
没有用于这种名称转换的内置方法。 RDD 接口有一些额外的方法围绕它的类型转换来进行这些类型的转换,但通常我们没有为 DataSets/DataFrames.
添加机制