如何在 Scala+Spark 中读取 csv 文件并将一列转换为 Map[String, String] 类型?

How to read csv file and convert one column to Map[String, String] type in Scala+Spark?

我有一个包含多列的 .csv 文件。

一行为例:

aaa,bbb,{'foo': 'xxx', 'bar': 'zzz'}

我想阅读它并转换为以下类型模式:

field1: String,
field2: String,
field3: Map[String, String]

我可以用这样的原始类型来做到这一点:

private val someSchema =
    StructType(
      StructField("field1", StringType, true) ::
      StructField("field2", StringType, true) ::
      StructField("field3", StringType, true) :: Nil)

     spark.read
      .format("csv")
      .option("header", true)
      .schema(someSchema)
      .load("path.csv")

但是当涉及到 Map[String, String] 时,由于

它不起作用

Exception in thread "main" org.apache.spark.sql.AnalysisException: CSV data source does not support map<string,string> data type.

我怎样才能用其他方式做到这一点?

您需要将其作为字符串读取,然后使用 from_json 函数将其转换为 MapType 以将其解析为 map<string,string>:

val result = df.withColumn(
  "field3",
  from_json(col("field3"), lit("map<string,string>"))
)

但是,从您的示例来看,文件中的值似乎没有转义,spark 将无法解析它,因为列 field3.

的值中有逗号(分隔符)

在这种情况下,您可以将文件作为文本读取,然后将 {} 中的逗号替换为另一个分隔符,例如 ;,然后用 , 拆分以获得 3 列,然后使用 str_to_map 函数将列 field3 转换为映射:

val df = spark.text("/path/file.csv")

val result = df.withColumn(
    "value",
    split(regexp_replace(col("value"), ",(?=[^{}]*\})", ";"), ",")
).select(
    col("value")(0).as("field1"),
    col("value")(1).as("field2"),
    regexp_replace(col("value")(2), "[{}' ]", "").as("field3")
).withColumn(
    "field3",
    expr("str_to_map(field3, ';', ':')")
)

result.show
//+------+------+------------------------+
//|field1|field2|field3                  |
//+------+------+------------------------+
//|aaa   |bbb   |[foo -> xxx, bar -> zzz]|
//+------+------+------------------------+