如何在 Scala+Spark 中读取 csv 文件并将一列转换为 Map[String, String] 类型?
How to read csv file and convert one column to Map[String, String] type in Scala+Spark?
我有一个包含多列的 .csv
文件。
一行为例:
aaa,bbb,{'foo': 'xxx', 'bar': 'zzz'}
我想阅读它并转换为以下类型模式:
field1: String,
field2: String,
field3: Map[String, String]
我可以用这样的原始类型来做到这一点:
private val someSchema =
StructType(
StructField("field1", StringType, true) ::
StructField("field2", StringType, true) ::
StructField("field3", StringType, true) :: Nil)
spark.read
.format("csv")
.option("header", true)
.schema(someSchema)
.load("path.csv")
但是当涉及到 Map[String, String] 时,由于
它不起作用
Exception in thread "main" org.apache.spark.sql.AnalysisException: CSV
data source does not support map<string,string> data type.
我怎样才能用其他方式做到这一点?
您需要将其作为字符串读取,然后使用 from_json
函数将其转换为 MapType 以将其解析为 map<string,string>
:
val result = df.withColumn(
"field3",
from_json(col("field3"), lit("map<string,string>"))
)
但是,从您的示例来看,文件中的值似乎没有转义,spark 将无法解析它,因为列 field3
.
的值中有逗号(分隔符)
在这种情况下,您可以将文件作为文本读取,然后将 {}
中的逗号替换为另一个分隔符,例如 ;
,然后用 ,
拆分以获得 3 列,然后使用 str_to_map
函数将列 field3
转换为映射:
val df = spark.text("/path/file.csv")
val result = df.withColumn(
"value",
split(regexp_replace(col("value"), ",(?=[^{}]*\})", ";"), ",")
).select(
col("value")(0).as("field1"),
col("value")(1).as("field2"),
regexp_replace(col("value")(2), "[{}' ]", "").as("field3")
).withColumn(
"field3",
expr("str_to_map(field3, ';', ':')")
)
result.show
//+------+------+------------------------+
//|field1|field2|field3 |
//+------+------+------------------------+
//|aaa |bbb |[foo -> xxx, bar -> zzz]|
//+------+------+------------------------+
我有一个包含多列的 .csv
文件。
一行为例:
aaa,bbb,{'foo': 'xxx', 'bar': 'zzz'}
我想阅读它并转换为以下类型模式:
field1: String,
field2: String,
field3: Map[String, String]
我可以用这样的原始类型来做到这一点:
private val someSchema =
StructType(
StructField("field1", StringType, true) ::
StructField("field2", StringType, true) ::
StructField("field3", StringType, true) :: Nil)
spark.read
.format("csv")
.option("header", true)
.schema(someSchema)
.load("path.csv")
但是当涉及到 Map[String, String] 时,由于
它不起作用Exception in thread "main" org.apache.spark.sql.AnalysisException: CSV data source does not support map<string,string> data type.
我怎样才能用其他方式做到这一点?
您需要将其作为字符串读取,然后使用 from_json
函数将其转换为 MapType 以将其解析为 map<string,string>
:
val result = df.withColumn(
"field3",
from_json(col("field3"), lit("map<string,string>"))
)
但是,从您的示例来看,文件中的值似乎没有转义,spark 将无法解析它,因为列 field3
.
在这种情况下,您可以将文件作为文本读取,然后将 {}
中的逗号替换为另一个分隔符,例如 ;
,然后用 ,
拆分以获得 3 列,然后使用 str_to_map
函数将列 field3
转换为映射:
val df = spark.text("/path/file.csv")
val result = df.withColumn(
"value",
split(regexp_replace(col("value"), ",(?=[^{}]*\})", ";"), ",")
).select(
col("value")(0).as("field1"),
col("value")(1).as("field2"),
regexp_replace(col("value")(2), "[{}' ]", "").as("field3")
).withColumn(
"field3",
expr("str_to_map(field3, ';', ':')")
)
result.show
//+------+------+------------------------+
//|field1|field2|field3 |
//+------+------+------------------------+
//|aaa |bbb |[foo -> xxx, bar -> zzz]|
//+------+------+------------------------+