PySpark - 将列分解为行并根据逻辑设置值
PySpark - Explode columns into rows and set values based on logic
给定一个数据框:
+---+-----------+---------+-------+------------+
| id| score|tx_amount|isValid| greeting|
+---+-----------+---------+-------+------------+
| 1| 0.2| 23.78| true| hello_world|
| 2| 0.6| 12.41| false|byebye_world|
+---+-----------+---------+-------+------------+
我想将这些列分解成一个名为 "col_value" 的行。这部分很好,但我还想对每一行应用逻辑,以便得到如下结果:
+---+------------+--------+---------+----------+-------+
| id| col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
| 1| 0.2| Y| N| N| N|
| 1| 23.78| N| Y| N| N|
| 1| true| N| N| Y| N|
| 1| hello_world| N| N| N| Y|
| 2| 0.6| Y| N| N| N|
| 2| 12.41| N| Y| N| N|
| 2| false| N| N| Y| N|
| 2|byebye_world| N| N| N| Y|
+---+------------+--------+---------+----------+-------+
我目前拥有的:
.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
.select("id", F.col("cols.*")) \
.withColumnRenamed("0", "col_value") \
.withColumn("is_score", F.lit("Y") if col1_type == "score" else F.lit("N")) \
.withColumn("is_amount", F.lit("Y") if col2_type == "amount" else F.lit("N")) \
.withColumn("is_boolean", F.lit("Y") if col3_type == "boolean" else F.lit("N")) \
.withColumn("is_text", F.lit("Y") if col4_type == "text" else F.lit("N")) \
.show()
但它给出了错误的输出,因为它对每一列给出了相同的结果:
+---+------------+--------+---------+----------+-------+
| id| col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
| 1| 0.2| Y| Y| Y| Y|
| 1| 23.78| Y| Y| Y| Y|
| 1| true| Y| Y| Y| Y|
| 1| hello_world| Y| Y| Y| Y|
| 2| 0.6| Y| Y| Y| Y|
| 2| 12.41| Y| Y| Y| Y|
| 2| false| Y| Y| Y| Y|
| 2|byebye_world| Y| Y| Y| Y|
+---+------------+--------+---------+----------+-------+
爆炸后如何才能得到正确的结果?
我想你想要的可以通过在你的col_value
上应用regex
来确定它是否是text,boolean,amount or score
来实现。只要 score
不超过 1.0 并且 amount
总是 高于 1.0,下面的代码就可以工作。如果不是这种情况,请告诉我我会更新逻辑。
from pyspark.sql import functions as F
df.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
.select("id", F.col("cols.*")) \
.withColumnRenamed("0", "col_value")\
.withColumn("text", (F.regexp_extract(F.col("col_value"),"([A-Za-z]+)",1)))\
.withColumn("boolean", F.when((F.col("text")=='true')|(F.col("text")=='false'),F.col("text")).otherwise(F.lit("")))\
.withColumn("text", F.when(F.col("text")==F.col("boolean"), F.lit("")).otherwise(F.col("text")))\
.withColumn("numeric", F.regexp_extract(F.col("col_value"),"([0-9]+)",1))\
.withColumn("is_text", F.when(F.col("text")!="", F.lit("Y")).otherwise(F.lit("N")))\
.withColumn("is_score", F.when(F.col("numeric")<=1, F.lit("Y")).otherwise(F.lit("N")))\
.withColumn("is_amount", F.when(F.col("numeric")>1, F.lit("Y")).otherwise(F.lit("N")))\
.withColumn("is_boolean", F.when(F.col("boolean")!="", F.lit("Y")).otherwise(F.lit("N")))\
.select("id", "col_value","is_score","is_amount","is_boolean","is_text").show()
+---+------------+--------+---------+----------+-------+
| id| col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
| 1| 0.2| Y| N| N| N|
| 1| 23.78| N| Y| N| N|
| 1| true| N| N| Y| N|
| 1| hello_world| N| N| N| Y|
| 2| 0.6| Y| N| N| N|
| 2| 12.41| N| Y| N| N|
| 2| false| N| N| Y| N|
| 2|byebye_world| N| N| N| Y|
+---+------------+--------+---------+----------+-------+
给定一个数据框:
+---+-----------+---------+-------+------------+
| id| score|tx_amount|isValid| greeting|
+---+-----------+---------+-------+------------+
| 1| 0.2| 23.78| true| hello_world|
| 2| 0.6| 12.41| false|byebye_world|
+---+-----------+---------+-------+------------+
我想将这些列分解成一个名为 "col_value" 的行。这部分很好,但我还想对每一行应用逻辑,以便得到如下结果:
+---+------------+--------+---------+----------+-------+
| id| col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
| 1| 0.2| Y| N| N| N|
| 1| 23.78| N| Y| N| N|
| 1| true| N| N| Y| N|
| 1| hello_world| N| N| N| Y|
| 2| 0.6| Y| N| N| N|
| 2| 12.41| N| Y| N| N|
| 2| false| N| N| Y| N|
| 2|byebye_world| N| N| N| Y|
+---+------------+--------+---------+----------+-------+
我目前拥有的:
.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
.select("id", F.col("cols.*")) \
.withColumnRenamed("0", "col_value") \
.withColumn("is_score", F.lit("Y") if col1_type == "score" else F.lit("N")) \
.withColumn("is_amount", F.lit("Y") if col2_type == "amount" else F.lit("N")) \
.withColumn("is_boolean", F.lit("Y") if col3_type == "boolean" else F.lit("N")) \
.withColumn("is_text", F.lit("Y") if col4_type == "text" else F.lit("N")) \
.show()
但它给出了错误的输出,因为它对每一列给出了相同的结果:
+---+------------+--------+---------+----------+-------+
| id| col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
| 1| 0.2| Y| Y| Y| Y|
| 1| 23.78| Y| Y| Y| Y|
| 1| true| Y| Y| Y| Y|
| 1| hello_world| Y| Y| Y| Y|
| 2| 0.6| Y| Y| Y| Y|
| 2| 12.41| Y| Y| Y| Y|
| 2| false| Y| Y| Y| Y|
| 2|byebye_world| Y| Y| Y| Y|
+---+------------+--------+---------+----------+-------+
爆炸后如何才能得到正确的结果?
我想你想要的可以通过在你的col_value
上应用regex
来确定它是否是text,boolean,amount or score
来实现。只要 score
不超过 1.0 并且 amount
总是 高于 1.0,下面的代码就可以工作。如果不是这种情况,请告诉我我会更新逻辑。
from pyspark.sql import functions as F
df.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
.select("id", F.col("cols.*")) \
.withColumnRenamed("0", "col_value")\
.withColumn("text", (F.regexp_extract(F.col("col_value"),"([A-Za-z]+)",1)))\
.withColumn("boolean", F.when((F.col("text")=='true')|(F.col("text")=='false'),F.col("text")).otherwise(F.lit("")))\
.withColumn("text", F.when(F.col("text")==F.col("boolean"), F.lit("")).otherwise(F.col("text")))\
.withColumn("numeric", F.regexp_extract(F.col("col_value"),"([0-9]+)",1))\
.withColumn("is_text", F.when(F.col("text")!="", F.lit("Y")).otherwise(F.lit("N")))\
.withColumn("is_score", F.when(F.col("numeric")<=1, F.lit("Y")).otherwise(F.lit("N")))\
.withColumn("is_amount", F.when(F.col("numeric")>1, F.lit("Y")).otherwise(F.lit("N")))\
.withColumn("is_boolean", F.when(F.col("boolean")!="", F.lit("Y")).otherwise(F.lit("N")))\
.select("id", "col_value","is_score","is_amount","is_boolean","is_text").show()
+---+------------+--------+---------+----------+-------+
| id| col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
| 1| 0.2| Y| N| N| N|
| 1| 23.78| N| Y| N| N|
| 1| true| N| N| Y| N|
| 1| hello_world| N| N| N| Y|
| 2| 0.6| Y| N| N| N|
| 2| 12.41| N| Y| N| N|
| 2| false| N| N| Y| N|
| 2|byebye_world| N| N| N| Y|
+---+------------+--------+---------+----------+-------+