将列作为结构数组的 Spark 转储到镶木地板

Spark dump to parquet with column as array of structures

我需要加载一个 csv 文件,其中有一列包含结构数组,并将其转储到镶木地板格式的另一个位置。 我的 csv 文件有两列,A 列和 B 列。 B列数据类型为array<struct<x: bigint, y:bigint>>

我尝试使用如下架构加载 csv 文件:

val schemaB = ArrayType(StructType(Seq(StructField("x",LongType),StructField("y",LongType))))
val schema = new StructType().add("A",StringType).add("B",schemaB)
spark.read.option("sep", "\t").schema(schema).csv(<location>)

但是,这没有用。我收到以下错误:

org.apache.spark.sql.AnalysisException: CSV data source does not support array<struct<x:bigint,y:bigint>&gt; data type.;</struct<x:bigint,y:bigint>

我什至尝试转换为所需的类型,但这没有用。

这是 B 列的外观示例:

|B                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------+
|68222:102332,21215:1000,10982:70330,|
|93302:13320,263721:902615,9382:100020,|

如果您使用的是最新版本的 spark 即 2.4+

,则可以使用 transform 函数

首先作为字符串读取,split 通过 "," 获取列表,再次 split 通过 ":" 获取 xy

val schema = new StructType().add("A",StringType).add("B",StringType)
val df = spark.read.option("delimiter", "\t").schema(schema).csv("path to csv")
val splitExpr =  expr("transform(split(B, ','), x -> (split(x, ':')[0] as x, split(x, ':')[1] as y))")

val result = df.select($"A", splitExpr.cast("array<struct<x: long, y:long>>") as "B" )

现在你可以把它保存在拼花地板上 如果您使用的是旧版本的 spark,那么您需要编写一个 udf 最终架构:

root
 |-- A: string (nullable = true)
 |-- B: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)