com.datastax.spark.connector.types.TypeConversionException: 无法将类型 scala.collection.immutable.Map$Map1 的对象映射转换为 (AnyRef, AnyRef)
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map of type scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)
我有一个具有以下模式的 Spark DataFrame。
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
我正在写信给 Cassandra table。
Cassandra table 架构如下:
create table provision_bmss.bmss_cust (
partition_key text,
row_key text,
group int,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key, group)
)
WITH cdc = 'FALSE'
AND default_time_to_live = '34560000';
我正在使用 Spark Datastax Connector 在以下逻辑之后写入 table:
val maxItem = 65000
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
我收到以下错误:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)
我认为这与代码中的 .agg(collect_list(map($"key", $"value")).as("attributes"))
行有关。
在这里,Map
中的所有内容都是类型 <String, String>
我无法解决同样的问题。有人可以帮忙吗。
输出的 DataFrame 模式如下(与预期不同):
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
预期的输出 DataFrame 架构如下:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
本题前题为
我提到了这个 post -
我能够使用 flatten 和 toMap 函数做同样的事情。
更新后的工作代码如下:
val joinMap = udf {
values: Seq[Map[String,String]] => values.flatten.toMap
}
def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {
val maxItem = 65000
val dfPreFinal =
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
}
有没有更好的方法(可能不用UDF)?
我有一个具有以下模式的 Spark DataFrame。
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
我正在写信给 Cassandra table。
Cassandra table 架构如下:
create table provision_bmss.bmss_cust (
partition_key text,
row_key text,
group int,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key, group)
)
WITH cdc = 'FALSE'
AND default_time_to_live = '34560000';
我正在使用 Spark Datastax Connector 在以下逻辑之后写入 table:
val maxItem = 65000
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
我收到以下错误:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)
我认为这与代码中的 .agg(collect_list(map($"key", $"value")).as("attributes"))
行有关。
在这里,Map
中的所有内容都是类型 <String, String>
我无法解决同样的问题。有人可以帮忙吗。
输出的 DataFrame 模式如下(与预期不同):
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
预期的输出 DataFrame 架构如下:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
本题前题为
我提到了这个 post -
我能够使用 flatten 和 toMap 函数做同样的事情。
更新后的工作代码如下:
val joinMap = udf {
values: Seq[Map[String,String]] => values.flatten.toMap
}
def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {
val maxItem = 65000
val dfPreFinal =
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
}
有没有更好的方法(可能不用UDF)?