com.datastax.spark.connector.types.TypeConversionException: 无法将类型 scala.collection.immutable.Map$Map1 的对象映射转换为 (AnyRef, AnyRef)

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map of type scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)

我有一个具有以下模式的 Spark DataFrame。

root
 |-- partition_key: string (nullable = true)
 |-- row_key: string (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- data_as_of_date: string (nullable = true)

我正在写信给 Cassandra table。

Cassandra table 架构如下:

create table provision_bmss.bmss_cust (
  partition_key text,
  row_key text,
  group int,
  attributes map<text,text>,
  data_as_of_date text,
  PRIMARY KEY (partition_key, row_key, group)
  )
  WITH cdc = 'FALSE'
  AND default_time_to_live = '34560000';

我正在使用 Spark Datastax Connector 在以下逻辑之后写入 table:

val maxItem = 65000
  
  dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
  .withColumn("group", $"pos".divide(maxItem).cast("int"))
  .groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
  .agg(collect_list(map($"key", $"value")).as("attributes"))
  .select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode("append")
  .options(Map( "keyspace" -> keySpace, "table" -> tableName ))
  .save()

我收到以下错误:

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)

我认为这与代码中的 .agg(collect_list(map($"key", $"value")).as("attributes")) 行有关。

在这里,Map 中的所有内容都是类型 <String, String>

我无法解决同样的问题。有人可以帮忙吗。

输出的 DataFrame 模式如下(与预期不同):

root
 |-- partition_key: string (nullable = true)
 |-- row_key: string (nullable = true)
 |-- group: int (nullable = true)
 |-- attributes: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- data_as_of_date: string (nullable = true)

预期的输出 DataFrame 架构如下:

root
     |-- partition_key: string (nullable = true)
     |-- row_key: string (nullable = true)
     |-- group: int (nullable = true)
     |-- attributes: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)
     |-- data_as_of_date: string (nullable = true)

本题前题为

我提到了这个 post -

我能够使用 flatten 和 toMap 函数做同样的事情。

更新后的工作代码如下:

val joinMap = udf {
  values: Seq[Map[String,String]] => values.flatten.toMap
}

def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {
   
  val maxItem = 65000
  
  val dfPreFinal = 
  dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
  .withColumn("group", $"pos".divide(maxItem).cast("int"))
  .groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
  .agg(collect_list(map($"key", $"value")).as("attributes"))
  .select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
  
  val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))
  
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode("append")
  .options(Map( "keyspace" -> keySpace, "table" -> tableName ))
  .save() 
}

有没有更好的方法(可能不用UDF)?