如何在 PysPark 中将数组结构拆分为 csv

How to split an array structure to csv in PysPark

这是一个示例数据和架构:

mySchema = StructType([
   StructField('firstname', StringType()),
   StructField('lastname', StringType()),
   StructField('langages', ArrayType(StructType([
         StructField('lang1', StringType()),
         StructField('lang2', StringType())
 ])))
])

myData = [("john", "smith", [
            {'lang1': 'Java', 'lang2': 'Python'},
            {'lang1': 'C', 'lang2': 'R'},
            {'lang1': 'Perl', 'lang2': 'Scala'}
            ]),
          ("robert", "plant", [
            {'lang1': 'C', 'lang2': 'Java'},
            {'lang1': 'Python', 'lang2': 'Perl'}
            ])
          ]

然后创建数据框:

df = spark.createDataFrame(data=myData, schema=mySchema)

架构如下:

df.printSchema()
root
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- langages: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- lang1: string (nullable = true)
|    |    |-- lang2: string (nullable = true)

当 show() 时:

df.show(df.count(), False)
+---------+--------+---------------------------------------+
|firstname|lastname|langages                               |
+---------+--------+---------------------------------------+
|john     |smith   |[[Java, Python], [C, R], [Perl, Scala]]|
|robert   |plant   |[[C, Java], [Python, Perl]]            |
+---------+--------+---------------------------------------+

至此,万事大吉。 但是现在,我想“扁平化”数组,为每个“语言”创建一个列,将语言与“/”连接起来,以便将其导出为 csv。 它可能看起来像这样:

firstname   lastname    langage_1    langage_2    langage_3 
john        smith       Java/Python  C/R          Perl/Scala
robert      plant       C/Java       Python/Perl

我试过像这样创建 3 列:

df.select([(col("langages")[x]).alias("langage_"+str(x+1)) for x in range(0, 3)]).show()
+--------------+--------------+-------------+
|     langage_1|     langage_2|    langage_3|
+--------------+--------------+-------------+
|[Java, Python]|        [C, R]|[Perl, Scala]|
|     [C, Java]|[Python, Perl]|         null|
+--------------+--------------+-------------+

我的问题是有时 langages 数组可能有 2 或 3 或 4 或任何元素。

所以 range(0, 3) 可能是 range(0, 4) !

我必须找到数组的最大元素

而且我不知道如何连接每个数组,例如 [Java、Python] 以获得“Java/Python”

感谢您的帮助

首先我们收集所需的最大列数。然后使用此值创建列。

n = df.select(F.max(F.size("langages")).alias("n_columns")).first().n_columns

df.select(
    "firstname",
    "lastname",
    *[F.concat_ws("/", F.array(
        F.col("langages").getItem(i).getItem("lang1"),
        F.col("langages").getItem(i).getItem("lang2"),
    )).alias("langages") for i in range(n)]
)