如何在 Pyspark 中按元素连接两个 ArrayType(StringType()) 列?
How to concat two ArrayType(StringType()) columns element-wise in Pyspark?
我在 spark 数据框中有两个 ArrayType(StringType())
列,我想按元素连接这两列:
输入:
+-------------+-------------+
|col1 |col2 |
+-------------+-------------+
|['a','b'] |['c','d'] |
|['a','b','c']|['e','f','g']|
+-------------+-------------+
输出:
+-------------+-------------+----------------+
|col1 |col2 |col3 |
+-------------+-------------+----------------+
|['a','b'] |['c','d'] |['ac', 'bd'] |
|['a','b','c']|['e','f','g']|['ae','bf','cg']|
+-------------+----------- -+----------------+
谢谢。
它不会真正缩放,但您可以在每个数组中获取 0th
和 1st
条目,然后说 col3
是 a[0] + b[0]
然后 a[1] + b[1]
。
将所有 4 个条目设为单独的值,然后将它们合并输出。
对于 Spark 2.4+,您可以使用 zip_with
函数:
zip_with(left, right, func)
- Merges the two given arrays,
element-wise, into a single array using function
df.withColumn("col3", expr("zip_with(col1, col2, (x, y) -> concat(x, y))")).show()
#+------+------+--------+
#| col1| col2| col3|
#+------+------+--------+
#|[a, b]|[c, d]|[ac, bd]|
#+------+------+--------+
另一种使用 transform
函数的方法如下:
df.withColumn("col3", expr("transform(col1, (x, i) -> concat(x, col2[i]))"))
transform
函数将第一个数组列 col1
作为参数,遍历其元素并应用 lambda 函数 (x, i) -> concat(x, col2[i])
,其中 x
实际元素和 i
它的索引用于从数组 col2
.
中获取相应的元素
这是一个通用的答案。只需查看 res 的结果。 2 个大小相等的数组,因此两者都有 n 个元素。
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Gen in this case numeric data, etc. 3 rows with 2 arrays of varying length, but both the same length as in your example
df = spark.createDataFrame([ ( list([x,x+1,4, x+100]), 4, list([x+100,x+200,999,x+500]) ) for x in range(3)], ['array1', 'value1', 'array2'] )
num_array_elements = len(df.select("array1").first()[0])
# concat
df2 = df.select(([ concat(col("array1")[i], col("array2")[i]) for i in range(num_array_elements)]))
df2.withColumn("res", array(df2.schema.names)).show(truncate=False)
returns:
这是一个替代答案,可用于更新后的非原始问题。使用 array 和 array_except 来演示此类方法的使用。接受的答案更优雅。
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Arbitrary max number of elements to apply array over, need not broadcast such a small amount of data afaik.
max_entries = 5
# Gen in this case numeric data, etc. 3 rows with 2 arrays of varying length,but per row constant length.
dfA = spark.createDataFrame([ ( list([x,x+1,4, x+100]), 4, list([x+100,x+200,999,x+500]) ) for x in range(3)], ['array1', 'value1', 'array2'] ).withColumn("s",size(col("array1")))
dfB = spark.createDataFrame([ ( list([x,x+1]), 4, list([x+100,x+200]) ) for x in range(5)], ['array1', 'value1', 'array2'] ).withColumn("s",size(col("array1")))
df = dfA.union(dfB)
# concat the array elements which are variable in size up to a max amount.
df2 = df.select(( [concat(col("array1")[i], col("array2")[i]) for i in range(max_entries)]))
df3 = df2.withColumn("res", array(df2.schema.names))
# Get results but strip out null entires from array.
df3.select(array_except(df3.res, array(lit(None)))).show(truncate=False)
无法获取要传递到范围内的列的 s 值。
这个returns:
+------------------------------+
|array_except(res, array(NULL))|
+------------------------------+
|[0100, 1200, 4999, 100500] |
|[1101, 2201, 4999, 101501] |
|[2102, 3202, 4999, 102502] |
|[0100, 1200] |
|[1101, 2201] |
|[2102, 3202] |
|[3103, 4203] |
|[4104, 5204] |
+------------------------------+
我在 spark 数据框中有两个 ArrayType(StringType())
列,我想按元素连接这两列:
输入:
+-------------+-------------+
|col1 |col2 |
+-------------+-------------+
|['a','b'] |['c','d'] |
|['a','b','c']|['e','f','g']|
+-------------+-------------+
输出:
+-------------+-------------+----------------+
|col1 |col2 |col3 |
+-------------+-------------+----------------+
|['a','b'] |['c','d'] |['ac', 'bd'] |
|['a','b','c']|['e','f','g']|['ae','bf','cg']|
+-------------+----------- -+----------------+
谢谢。
它不会真正缩放,但您可以在每个数组中获取 0th
和 1st
条目,然后说 col3
是 a[0] + b[0]
然后 a[1] + b[1]
。
将所有 4 个条目设为单独的值,然后将它们合并输出。
对于 Spark 2.4+,您可以使用 zip_with
函数:
zip_with(left, right, func)
- Merges the two given arrays, element-wise, into a single array using function
df.withColumn("col3", expr("zip_with(col1, col2, (x, y) -> concat(x, y))")).show()
#+------+------+--------+
#| col1| col2| col3|
#+------+------+--------+
#|[a, b]|[c, d]|[ac, bd]|
#+------+------+--------+
另一种使用 transform
函数的方法如下:
df.withColumn("col3", expr("transform(col1, (x, i) -> concat(x, col2[i]))"))
transform
函数将第一个数组列 col1
作为参数,遍历其元素并应用 lambda 函数 (x, i) -> concat(x, col2[i])
,其中 x
实际元素和 i
它的索引用于从数组 col2
.
这是一个通用的答案。只需查看 res 的结果。 2 个大小相等的数组,因此两者都有 n 个元素。
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Gen in this case numeric data, etc. 3 rows with 2 arrays of varying length, but both the same length as in your example
df = spark.createDataFrame([ ( list([x,x+1,4, x+100]), 4, list([x+100,x+200,999,x+500]) ) for x in range(3)], ['array1', 'value1', 'array2'] )
num_array_elements = len(df.select("array1").first()[0])
# concat
df2 = df.select(([ concat(col("array1")[i], col("array2")[i]) for i in range(num_array_elements)]))
df2.withColumn("res", array(df2.schema.names)).show(truncate=False)
returns:
这是一个替代答案,可用于更新后的非原始问题。使用 array 和 array_except 来演示此类方法的使用。接受的答案更优雅。
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Arbitrary max number of elements to apply array over, need not broadcast such a small amount of data afaik.
max_entries = 5
# Gen in this case numeric data, etc. 3 rows with 2 arrays of varying length,but per row constant length.
dfA = spark.createDataFrame([ ( list([x,x+1,4, x+100]), 4, list([x+100,x+200,999,x+500]) ) for x in range(3)], ['array1', 'value1', 'array2'] ).withColumn("s",size(col("array1")))
dfB = spark.createDataFrame([ ( list([x,x+1]), 4, list([x+100,x+200]) ) for x in range(5)], ['array1', 'value1', 'array2'] ).withColumn("s",size(col("array1")))
df = dfA.union(dfB)
# concat the array elements which are variable in size up to a max amount.
df2 = df.select(( [concat(col("array1")[i], col("array2")[i]) for i in range(max_entries)]))
df3 = df2.withColumn("res", array(df2.schema.names))
# Get results but strip out null entires from array.
df3.select(array_except(df3.res, array(lit(None)))).show(truncate=False)
无法获取要传递到范围内的列的 s 值。
这个returns:
+------------------------------+
|array_except(res, array(NULL))|
+------------------------------+
|[0100, 1200, 4999, 100500] |
|[1101, 2201, 4999, 101501] |
|[2102, 3202, 4999, 102502] |
|[0100, 1200] |
|[1101, 2201] |
|[2102, 3202] |
|[3103, 4203] |
|[4104, 5204] |
+------------------------------+