Pyspark eval 或 expr - 使用 when 语句连接多个数据框列
Pyspark eval or expr - Concatenating multiple dataframe columns using when statement
我正在尝试连接多个数据框列我无法在下面的 when 语句中执行 pyspark eval 或 expr concat_ws。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws,concat,when,col,expr
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
('a', 'b'))
keys = ['a','b']
key_val = ''
for key in keys:
key_val = key_val + 'when(df["{0}"].isNull(), lit("_")).otherwise(df["{0}"]),'.format(key)
key_val_exp = key_val.rsplit(',', 1)[0]
spaceDeleteUDF = udf(lambda s: str(s).replace(" ", "_").strip(), StringType())
df=df.withColumn("unique_id", spaceDeleteUDF(concat_ws("-",eval(key_val_exp))))
错误:
"TypeError: Invalid argument, not a string or column: (Column<b'CASE WHEN (a IS NULL) THEN _ ELSE a END'>, Column<b'CASE WHEN (b IS NULL) THEN _ ELSE b END'>) of type <class 'tuple'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function."
预期输出:
+----+----+---------+
| a| b|unique_id|
+----+----+---------+
| foo| bar| foo-bar|
|ba z|null| ba_z-_|
+----+----+---------+
看看这个。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
('a', 'b'))
df.show()
# +----+----+
# | a| b|
# +----+----+
# | foo| bar|
# |ba z|null|
# +----+----+
df1 = df.select( *[F.col(column) for column in df.columns],*[ F.when(F.col(column).isNull(),F.lit('_')).otherwise(F.col(column)).alias(column+'_mod') for column in df.columns])
df2 = df1.select(*[F.col(column) for column in df1.columns if '_mod' not in column], *[ F.regexp_replace(column, r'\s', '_').alias(column) for column in df1.columns if '_mod' in column])
df3 = df2.select( *[F.col(column) for column in df1.columns if '_mod' not in column],F.concat_ws('-',*[F.col(column) for column in df2.columns if '_mod' in column]).alias('unique_id'))
df3.show()
# +----+----+---------+
# | a| b|unique_id|
# +----+----+---------+
# | foo| bar| foo-bar|
# |ba z|null| ba_z-_|
# +----+----+---------+
我正在尝试连接多个数据框列我无法在下面的 when 语句中执行 pyspark eval 或 expr concat_ws。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws,concat,when,col,expr
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
('a', 'b'))
keys = ['a','b']
key_val = ''
for key in keys:
key_val = key_val + 'when(df["{0}"].isNull(), lit("_")).otherwise(df["{0}"]),'.format(key)
key_val_exp = key_val.rsplit(',', 1)[0]
spaceDeleteUDF = udf(lambda s: str(s).replace(" ", "_").strip(), StringType())
df=df.withColumn("unique_id", spaceDeleteUDF(concat_ws("-",eval(key_val_exp))))
错误:
"TypeError: Invalid argument, not a string or column: (Column<b'CASE WHEN (a IS NULL) THEN _ ELSE a END'>, Column<b'CASE WHEN (b IS NULL) THEN _ ELSE b END'>) of type <class 'tuple'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function."
预期输出:
+----+----+---------+
| a| b|unique_id|
+----+----+---------+
| foo| bar| foo-bar|
|ba z|null| ba_z-_|
+----+----+---------+
看看这个。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
('a', 'b'))
df.show()
# +----+----+
# | a| b|
# +----+----+
# | foo| bar|
# |ba z|null|
# +----+----+
df1 = df.select( *[F.col(column) for column in df.columns],*[ F.when(F.col(column).isNull(),F.lit('_')).otherwise(F.col(column)).alias(column+'_mod') for column in df.columns])
df2 = df1.select(*[F.col(column) for column in df1.columns if '_mod' not in column], *[ F.regexp_replace(column, r'\s', '_').alias(column) for column in df1.columns if '_mod' in column])
df3 = df2.select( *[F.col(column) for column in df1.columns if '_mod' not in column],F.concat_ws('-',*[F.col(column) for column in df2.columns if '_mod' in column]).alias('unique_id'))
df3.show()
# +----+----+---------+
# | a| b|unique_id|
# +----+----+---------+
# | foo| bar| foo-bar|
# |ba z|null| ba_z-_|
# +----+----+---------+