如何使用 PySpark 在数据框中按位置合并两个列表
How to merge two list by position wise in dataframe using PySpark
我有如下数据框。
当前数据帧
+---+--------+---------=+
| id|size |variantID |
+---+----+---+----------+
| 1| [10,20]| [150,160]|
| 2| [2] | [1] |
| 3| [] | [] |
+---+--------+----------+
我想通过使用此符号 (|) 按位置合并大小数组和 variantID 数组来添加新列。由此我想要名为 sizeMap 的新数组。 Size 中的元素数量与 variantID 列相同。
预期输出:
+---+--------+---------------------------+
| id|size |variantID |sizeMap |
+---+----+---+---------------------------+
| 1| [10,20]| [150,160]|[10|150, 20|160]|
| 2| [2] | [1] | [2|1] |
| 3| [] | [] | [] |
+---+--------+----------------------------+
你能帮我解决这个问题吗...!
我有以下可行的解决方案。但由于 UDF,对于大数据可能会很慢。
最后一列也将是字符串,因为它具有字符串管道字符“|”。
from pyspark.sql.functions import *
from pyspark.sql.types import *
values = [(1,[10,20], [150,160]),
(2,[2], [2|1] ),
(3,[], [])]
rdd = sc.parallelize(values)
schema = StructType([StructField("id", IntegerType(), True),
StructField("size", ArrayType(IntegerType()), True),
StructField("variantID", ArrayType(IntegerType()), True)])
data = spark.createDataFrame(rdd, schema)
data.show()
"""
+---+--------+----------+
| id| size| variantID|
+---+--------+----------+
| 1|[10, 20]|[150, 160]|
| 2| [2]| [3]|
| 3| []| []|
+---+--------+----------+
"""
def arrangeAsReuired(inputString) :
inputString = inputString.replace("[","").replace("]","")
if inputString.strip() in "[]&[]" :
sizeMapPopulated = "[]"
else :
firstArray = inputString.split("&")[0].split(",")
secondArray = inputString.split("&")[1].split(",")
sizeMapPopulated = [str(firstArray[x]) + "|" + str(secondArray[x]) for x in range(0, len(firstArray), 1)]
return str(sizeMapPopulated)
udfToReturnData = udf(lambda z: arrangeAsReuired(z), StringType())
spark.udf.register("udfToReturnData", udfToReturnData)
data = data.withColumn("sizeMap", \
udfToReturnData(concat(col("size").cast("string"), lit("&"), col("variantID").cast("string")).cast("string"))) \
.select("id","size","sizeMap")
data.show(20,False)
"""
+---+--------+----------------------+
|id |size |sizeMap |
+---+--------+----------------------+
|1 |[10, 20]|['10|150', ' 20| 160']|
|2 |[2] |['2|3'] |
|3 |[] |[] |
+---+--------+----------------------+
"""
也许这很有用(用 scala 编写,但只需稍作改动即可在 pyspark 中使用)
加载提供的测试数据
val df =
spark.sql(
"""
|select id, size, variantID from values
| (1, array(10, 20), array(150, 160)),
| (2, array(2), array(1)),
| (3, array(null), array(null))
| T(id, size, variantID)
""".stripMargin)
df.show(false)
df.printSchema()
/**
* +---+--------+----------+
* |id |size |variantID |
* +---+--------+----------+
* |1 |[10, 20]|[150, 160]|
* |2 |[2] |[1] |
* |3 |[] |[] |
* +---+--------+----------+
*
* root
* |-- id: integer (nullable = false)
* |-- size: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- variantID: array (nullable = false)
* | |-- element: integer (containsNull = true)
*/
按位置压缩 2 数组(无 UDF)
val p = df.withColumn("sizeMap", arrays_zip($"size", $"variantID"))
.withColumn("sizeMap", expr("TRANSFORM(sizeMap, x -> concat_ws('|', x.size, x.variantID))"))
p.show(false)
p.printSchema()
/**
* +---+--------+----------+----------------+
* |id |size |variantID |sizeMap |
* +---+--------+----------+----------------+
* |1 |[10, 20]|[150, 160]|[10|150, 20|160]|
* |2 |[2] |[1] |[2|1] |
* |3 |[] |[] |[] |
* +---+--------+----------+----------------+
*
* root
* |-- id: integer (nullable = false)
* |-- size: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- variantID: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- sizeMap: array (nullable = false)
* | |-- element: string (containsNull = false)
*/
我有如下数据框。
当前数据帧
+---+--------+---------=+
| id|size |variantID |
+---+----+---+----------+
| 1| [10,20]| [150,160]|
| 2| [2] | [1] |
| 3| [] | [] |
+---+--------+----------+
我想通过使用此符号 (|) 按位置合并大小数组和 variantID 数组来添加新列。由此我想要名为 sizeMap 的新数组。 Size 中的元素数量与 variantID 列相同。
预期输出:
+---+--------+---------------------------+
| id|size |variantID |sizeMap |
+---+----+---+---------------------------+
| 1| [10,20]| [150,160]|[10|150, 20|160]|
| 2| [2] | [1] | [2|1] |
| 3| [] | [] | [] |
+---+--------+----------------------------+
你能帮我解决这个问题吗...!
我有以下可行的解决方案。但由于 UDF,对于大数据可能会很慢。 最后一列也将是字符串,因为它具有字符串管道字符“|”。
from pyspark.sql.functions import *
from pyspark.sql.types import *
values = [(1,[10,20], [150,160]),
(2,[2], [2|1] ),
(3,[], [])]
rdd = sc.parallelize(values)
schema = StructType([StructField("id", IntegerType(), True),
StructField("size", ArrayType(IntegerType()), True),
StructField("variantID", ArrayType(IntegerType()), True)])
data = spark.createDataFrame(rdd, schema)
data.show()
"""
+---+--------+----------+
| id| size| variantID|
+---+--------+----------+
| 1|[10, 20]|[150, 160]|
| 2| [2]| [3]|
| 3| []| []|
+---+--------+----------+
"""
def arrangeAsReuired(inputString) :
inputString = inputString.replace("[","").replace("]","")
if inputString.strip() in "[]&[]" :
sizeMapPopulated = "[]"
else :
firstArray = inputString.split("&")[0].split(",")
secondArray = inputString.split("&")[1].split(",")
sizeMapPopulated = [str(firstArray[x]) + "|" + str(secondArray[x]) for x in range(0, len(firstArray), 1)]
return str(sizeMapPopulated)
udfToReturnData = udf(lambda z: arrangeAsReuired(z), StringType())
spark.udf.register("udfToReturnData", udfToReturnData)
data = data.withColumn("sizeMap", \
udfToReturnData(concat(col("size").cast("string"), lit("&"), col("variantID").cast("string")).cast("string"))) \
.select("id","size","sizeMap")
data.show(20,False)
"""
+---+--------+----------------------+
|id |size |sizeMap |
+---+--------+----------------------+
|1 |[10, 20]|['10|150', ' 20| 160']|
|2 |[2] |['2|3'] |
|3 |[] |[] |
+---+--------+----------------------+
"""
也许这很有用(用 scala 编写,但只需稍作改动即可在 pyspark 中使用)
加载提供的测试数据
val df =
spark.sql(
"""
|select id, size, variantID from values
| (1, array(10, 20), array(150, 160)),
| (2, array(2), array(1)),
| (3, array(null), array(null))
| T(id, size, variantID)
""".stripMargin)
df.show(false)
df.printSchema()
/**
* +---+--------+----------+
* |id |size |variantID |
* +---+--------+----------+
* |1 |[10, 20]|[150, 160]|
* |2 |[2] |[1] |
* |3 |[] |[] |
* +---+--------+----------+
*
* root
* |-- id: integer (nullable = false)
* |-- size: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- variantID: array (nullable = false)
* | |-- element: integer (containsNull = true)
*/
按位置压缩 2 数组(无 UDF)
val p = df.withColumn("sizeMap", arrays_zip($"size", $"variantID"))
.withColumn("sizeMap", expr("TRANSFORM(sizeMap, x -> concat_ws('|', x.size, x.variantID))"))
p.show(false)
p.printSchema()
/**
* +---+--------+----------+----------------+
* |id |size |variantID |sizeMap |
* +---+--------+----------+----------------+
* |1 |[10, 20]|[150, 160]|[10|150, 20|160]|
* |2 |[2] |[1] |[2|1] |
* |3 |[] |[] |[] |
* +---+--------+----------+----------------+
*
* root
* |-- id: integer (nullable = false)
* |-- size: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- variantID: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- sizeMap: array (nullable = false)
* | |-- element: string (containsNull = false)
*/