如何在 PysPark 中将数组结构拆分为 csv
How to split an array structure to csv in PysPark
这是一个示例数据和架构:
mySchema = StructType([
StructField('firstname', StringType()),
StructField('lastname', StringType()),
StructField('langages', ArrayType(StructType([
StructField('lang1', StringType()),
StructField('lang2', StringType())
])))
])
myData = [("john", "smith", [
{'lang1': 'Java', 'lang2': 'Python'},
{'lang1': 'C', 'lang2': 'R'},
{'lang1': 'Perl', 'lang2': 'Scala'}
]),
("robert", "plant", [
{'lang1': 'C', 'lang2': 'Java'},
{'lang1': 'Python', 'lang2': 'Perl'}
])
]
然后创建数据框:
df = spark.createDataFrame(data=myData, schema=mySchema)
架构如下:
df.printSchema()
root
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- langages: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lang1: string (nullable = true)
| | |-- lang2: string (nullable = true)
当 show() 时:
df.show(df.count(), False)
+---------+--------+---------------------------------------+
|firstname|lastname|langages |
+---------+--------+---------------------------------------+
|john |smith |[[Java, Python], [C, R], [Perl, Scala]]|
|robert |plant |[[C, Java], [Python, Perl]] |
+---------+--------+---------------------------------------+
至此,万事大吉。
但是现在,我想“扁平化”数组,为每个“语言”创建一个列,将语言与“/”连接起来,以便将其导出为 csv。
它可能看起来像这样:
firstname lastname langage_1 langage_2 langage_3
john smith Java/Python C/R Perl/Scala
robert plant C/Java Python/Perl
我试过像这样创建 3 列:
df.select([(col("langages")[x]).alias("langage_"+str(x+1)) for x in range(0, 3)]).show()
+--------------+--------------+-------------+
| langage_1| langage_2| langage_3|
+--------------+--------------+-------------+
|[Java, Python]| [C, R]|[Perl, Scala]|
| [C, Java]|[Python, Perl]| null|
+--------------+--------------+-------------+
我的问题是有时 langages 数组可能有 2 或 3 或 4 或任何元素。
所以 range(0, 3) 可能是 range(0, 4) !
我必须找到数组的最大元素
而且我不知道如何连接每个数组,例如 [Java、Python] 以获得“Java/Python”
感谢您的帮助
首先我们收集所需的最大列数。然后使用此值创建列。
n = df.select(F.max(F.size("langages")).alias("n_columns")).first().n_columns
df.select(
"firstname",
"lastname",
*[F.concat_ws("/", F.array(
F.col("langages").getItem(i).getItem("lang1"),
F.col("langages").getItem(i).getItem("lang2"),
)).alias("langages") for i in range(n)]
)
这是一个示例数据和架构:
mySchema = StructType([
StructField('firstname', StringType()),
StructField('lastname', StringType()),
StructField('langages', ArrayType(StructType([
StructField('lang1', StringType()),
StructField('lang2', StringType())
])))
])
myData = [("john", "smith", [
{'lang1': 'Java', 'lang2': 'Python'},
{'lang1': 'C', 'lang2': 'R'},
{'lang1': 'Perl', 'lang2': 'Scala'}
]),
("robert", "plant", [
{'lang1': 'C', 'lang2': 'Java'},
{'lang1': 'Python', 'lang2': 'Perl'}
])
]
然后创建数据框:
df = spark.createDataFrame(data=myData, schema=mySchema)
架构如下:
df.printSchema()
root
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- langages: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lang1: string (nullable = true)
| | |-- lang2: string (nullable = true)
当 show() 时:
df.show(df.count(), False)
+---------+--------+---------------------------------------+
|firstname|lastname|langages |
+---------+--------+---------------------------------------+
|john |smith |[[Java, Python], [C, R], [Perl, Scala]]|
|robert |plant |[[C, Java], [Python, Perl]] |
+---------+--------+---------------------------------------+
至此,万事大吉。 但是现在,我想“扁平化”数组,为每个“语言”创建一个列,将语言与“/”连接起来,以便将其导出为 csv。 它可能看起来像这样:
firstname lastname langage_1 langage_2 langage_3
john smith Java/Python C/R Perl/Scala
robert plant C/Java Python/Perl
我试过像这样创建 3 列:
df.select([(col("langages")[x]).alias("langage_"+str(x+1)) for x in range(0, 3)]).show()
+--------------+--------------+-------------+
| langage_1| langage_2| langage_3|
+--------------+--------------+-------------+
|[Java, Python]| [C, R]|[Perl, Scala]|
| [C, Java]|[Python, Perl]| null|
+--------------+--------------+-------------+
我的问题是有时 langages 数组可能有 2 或 3 或 4 或任何元素。
所以 range(0, 3) 可能是 range(0, 4) !
我必须找到数组的最大元素
而且我不知道如何连接每个数组,例如 [Java、Python] 以获得“Java/Python”
感谢您的帮助
首先我们收集所需的最大列数。然后使用此值创建列。
n = df.select(F.max(F.size("langages")).alias("n_columns")).first().n_columns
df.select(
"firstname",
"lastname",
*[F.concat_ws("/", F.array(
F.col("langages").getItem(i).getItem("lang1"),
F.col("langages").getItem(i).getItem("lang2"),
)).alias("langages") for i in range(n)]
)