如何将数据列表添加到 pyspark 的 dataschame 中?

How to add a list of data into dataschame in pyspark?

有一个数据框包含两列,一列是键,另一列是值。如下所示:

+--------------------+--------------------+
|                 key|              value |
+--------------------+--------------------+
|a                   |abcde               |

我想根据位置将值分割成多个值,并在键后生成一个新的数据帧。如下所示:

+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|a                   |[a, 0]              |
|a                   |[b, 1]              |
|a                   |[c, 2]              |
|a                   |[d, 3]              |
|a                   |[e, 4]              |

我尝试过使用 join() 和 StructType() 但我失败了。有没有可能的方法来做到这一点?谢谢!

from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("a","abcd",3000)]
df = spark.createDataFrame(data,["Name","Department","Salary"])
df.select(  f.expr( "posexplode( filter (split(Department,''), x -> x != '' ) ) as (value, index)") , f.col("Name")).show()
+-----+-----+----+
|value|index|Name|
+-----+-----+----+
|    0|    a|   a|
|    1|    b|   a|
|    2|    c|   a|
|    3|    d|   a|
+-----+-----+----+

# to explain the above functions more clearly.
f.expr( "..." ) #--> write a SQL expression
 "posexplode( [..] )" #for an array turn them into rows with their index
  "filter( [..] , x -> x != '' )" #for an array filter out ''
   "split( Department, '' )" #split the column on null (extract characters) which will add null in out array that we need to filter out.

这是根据您的具体要求进行的更新,只需稍加操作即可将其转换为您需要的格式:

df.select(  f.expr( "posexplode( filter (split(Department,''), x -> x != '' ) ) as (myvalue, index)") , f.col("Name"),f.expr( "array(myvalue,index) as value ")).drop("index","myvalue").show()
+----+------+
|Name| value|
+----+------+
|   a|[0, a]|
|   a|[1, b]|
|   a|[2, c]|
|   a|[3, d]|
+----+------+

以下代码段将您的数据转换为您指定的格式:

import pyspark.sql.functions as F
df = spark.createDataFrame([("a", "abcde",)], ["key", "value"])
df_split = df.withColumn("split", F.array_remove(F.split("value", ""), ""))
df_split.show()
df_exploded = df_split.select("key", F.posexplode("split"))
df_exploded.show()
df_array = df_exploded.select("key", F.array("col", "pos").alias("value"))
df_array.show()

输出:

+---+-----+---------------+
|key|value|          split|
+---+-----+---------------+
|  a|abcde|[a, b, c, d, e]|
+---+-----+---------------+

+---+---+---+
|key|pos|col|
+---+---+---+
|  a|  0|  a|
|  a|  1|  b|
|  a|  2|  c|
|  a|  3|  d|
|  a|  4|  e|
+---+---+---+

+---+------+
|key| value|
+---+------+
|  a|[a, 0]|
|  a|[b, 1]|
|  a|[c, 2]|
|  a|[d, 3]|
|  a|[e, 4]|
+---+------+

首先将字符串拆分成一个数组,拆分方式为空字符串。因此,需要删除最后一个元素。

然后将 split 列数组的每个元素转换为一行,其在数组中的位置为 pos

最后,将列组合成一个数组。