Pyspark SQL:如何创建一个来自列值加上另一个列名的新值?

Pyspark SQL: How to make a new value that comes from a value of column plus another column name?

我正在尝试获取一个新值,该值来自列的值加上另一个列名。

例如,鉴于此:

+----+---+----+----+
|base|  1|   2|   3|
+----+---+----+----+
|  10| AA|  aa|  Aa|
|  20| BB|  bb|  Bb|
|  30| CC|  cc|  Cc|
+----+---+----+----+

我想要这个:


          +---------+----+
          | new_base|   v| 
          +---------+----+
10 + 1 -> |       11|  AA|
10 + 2 -> |       12|  aa|
10 + 3 -> |       13|  Aa|
20 + 1 -> |       21|  BB| 
20 + 2 -> |       22|  bb|
20 + 3 -> |       23|  Bb| 
30 + 1 -> |       31|  CC|
30 + 2 -> |       32|  cc|
30 + 3 -> |       33|  Cc|
          +---------+----+

注意:我在 Spark 2.4 中编码

我们可以用explode函数来解决

# Importing requisite functions.
from pyspark.sql.functions import array, col, explode, struct, lit

# Creating the DataFrame
df = sqlContext.createDataFrame([(10,'AA','aa','Aa'),(20,'BB','bb','Bb'),(30,'CC','cc','Cc')],['base','1','2','3'])
df.show()
+----+---+---+---+
|base|  1|  2|  3|
+----+---+---+---+
|  10| AA| aa| Aa|
|  20| BB| bb| Bb|
|  30| CC| cc| Cc|
+----+---+---+---+

正在编写一个函数来分解 DataFrame

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

正在应用以下函数。由于创建的列 new_base 有一个 decimal,默认情况下它的类型是 double,因此我们明确地将其转换为 integer 以避免每个数字都以 [= 为后缀19=]

df = to_explode(df, ['base'])
df = df.withColumn('new_base',col('base')+col('key'))\
       .select(col('new_base').cast(IntegerType()),'val')
df.show()
+--------+---+
|new_base|val|
+--------+---+
|      11| AA|
|      12| aa|
|      13| Aa|
|      21| BB|
|      22| bb|
|      23| Bb|
|      31| CC|
|      32| cc|
|      33| Cc|
+--------+---+
df.printSchema()
root
 |-- new_base: integer (nullable = true)
 |-- val: string (nullable = true)

另一个使用reduce函数的典型案例:

from functools import reduce
from pyspark.sql.functions import col

cols = df.columns[1:]

df_new = reduce(lambda d1,d2: d1.union(d2),
    [ df.select((col('base') + int(c)).astype('int').alias('new_base'), col(c).alias('v')) for c in cols ]
)

df_new.show()
+--------+---+
|new_base|  v|
+--------+---+
|      11| AA|
|      21| BB|
|      31| CC|
|      12| aa|
|      22| bb|
|      32| cc|
|      13| Aa|
|      23| Bb|
|      33| Cc|
+--------+---+