Pyspark SQL:如何创建一个来自列值加上另一个列名的新值?
Pyspark SQL: How to make a new value that comes from a value of column plus another column name?
我正在尝试获取一个新值,该值来自列的值加上另一个列名。
例如,鉴于此:
+----+---+----+----+
|base| 1| 2| 3|
+----+---+----+----+
| 10| AA| aa| Aa|
| 20| BB| bb| Bb|
| 30| CC| cc| Cc|
+----+---+----+----+
我想要这个:
+---------+----+
| new_base| v|
+---------+----+
10 + 1 -> | 11| AA|
10 + 2 -> | 12| aa|
10 + 3 -> | 13| Aa|
20 + 1 -> | 21| BB|
20 + 2 -> | 22| bb|
20 + 3 -> | 23| Bb|
30 + 1 -> | 31| CC|
30 + 2 -> | 32| cc|
30 + 3 -> | 33| Cc|
+---------+----+
注意:我在 Spark 2.4 中编码
我们可以用explode
函数来解决
# Importing requisite functions.
from pyspark.sql.functions import array, col, explode, struct, lit
# Creating the DataFrame
df = sqlContext.createDataFrame([(10,'AA','aa','Aa'),(20,'BB','bb','Bb'),(30,'CC','cc','Cc')],['base','1','2','3'])
df.show()
+----+---+---+---+
|base| 1| 2| 3|
+----+---+---+---+
| 10| AA| aa| Aa|
| 20| BB| bb| Bb|
| 30| CC| cc| Cc|
+----+---+---+---+
正在编写一个函数来分解 DataFrame
。
def to_explode(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
正在应用以下函数。由于创建的列 new_base
有一个 decimal
,默认情况下它的类型是 double
,因此我们明确地将其转换为 integer
以避免每个数字都以 [= 为后缀19=]
df = to_explode(df, ['base'])
df = df.withColumn('new_base',col('base')+col('key'))\
.select(col('new_base').cast(IntegerType()),'val')
df.show()
+--------+---+
|new_base|val|
+--------+---+
| 11| AA|
| 12| aa|
| 13| Aa|
| 21| BB|
| 22| bb|
| 23| Bb|
| 31| CC|
| 32| cc|
| 33| Cc|
+--------+---+
df.printSchema()
root
|-- new_base: integer (nullable = true)
|-- val: string (nullable = true)
另一个使用reduce函数的典型案例:
from functools import reduce
from pyspark.sql.functions import col
cols = df.columns[1:]
df_new = reduce(lambda d1,d2: d1.union(d2),
[ df.select((col('base') + int(c)).astype('int').alias('new_base'), col(c).alias('v')) for c in cols ]
)
df_new.show()
+--------+---+
|new_base| v|
+--------+---+
| 11| AA|
| 21| BB|
| 31| CC|
| 12| aa|
| 22| bb|
| 32| cc|
| 13| Aa|
| 23| Bb|
| 33| Cc|
+--------+---+
我正在尝试获取一个新值,该值来自列的值加上另一个列名。
例如,鉴于此:
+----+---+----+----+
|base| 1| 2| 3|
+----+---+----+----+
| 10| AA| aa| Aa|
| 20| BB| bb| Bb|
| 30| CC| cc| Cc|
+----+---+----+----+
我想要这个:
+---------+----+
| new_base| v|
+---------+----+
10 + 1 -> | 11| AA|
10 + 2 -> | 12| aa|
10 + 3 -> | 13| Aa|
20 + 1 -> | 21| BB|
20 + 2 -> | 22| bb|
20 + 3 -> | 23| Bb|
30 + 1 -> | 31| CC|
30 + 2 -> | 32| cc|
30 + 3 -> | 33| Cc|
+---------+----+
注意:我在 Spark 2.4 中编码
我们可以用explode
函数来解决
# Importing requisite functions.
from pyspark.sql.functions import array, col, explode, struct, lit
# Creating the DataFrame
df = sqlContext.createDataFrame([(10,'AA','aa','Aa'),(20,'BB','bb','Bb'),(30,'CC','cc','Cc')],['base','1','2','3'])
df.show()
+----+---+---+---+
|base| 1| 2| 3|
+----+---+---+---+
| 10| AA| aa| Aa|
| 20| BB| bb| Bb|
| 30| CC| cc| Cc|
+----+---+---+---+
正在编写一个函数来分解 DataFrame
。
def to_explode(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
正在应用以下函数。由于创建的列 new_base
有一个 decimal
,默认情况下它的类型是 double
,因此我们明确地将其转换为 integer
以避免每个数字都以 [= 为后缀19=]
df = to_explode(df, ['base'])
df = df.withColumn('new_base',col('base')+col('key'))\
.select(col('new_base').cast(IntegerType()),'val')
df.show()
+--------+---+
|new_base|val|
+--------+---+
| 11| AA|
| 12| aa|
| 13| Aa|
| 21| BB|
| 22| bb|
| 23| Bb|
| 31| CC|
| 32| cc|
| 33| Cc|
+--------+---+
df.printSchema()
root
|-- new_base: integer (nullable = true)
|-- val: string (nullable = true)
另一个使用reduce函数的典型案例:
from functools import reduce
from pyspark.sql.functions import col
cols = df.columns[1:]
df_new = reduce(lambda d1,d2: d1.union(d2),
[ df.select((col('base') + int(c)).astype('int').alias('new_base'), col(c).alias('v')) for c in cols ]
)
df_new.show()
+--------+---+
|new_base| v|
+--------+---+
| 11| AA|
| 21| BB|
| 31| CC|
| 12| aa|
| 22| bb|
| 32| cc|
| 13| Aa|
| 23| Bb|
| 33| Cc|
+--------+---+