如何使用 for 循环和 case-when 条件语句修改 Pyspark 数据框?
How to modify a Pyspark dataframe using a for loop and case-when conditionals?
我正在尝试修改一个 Spark 数据框,以便根据一个过滤值,我们将子集划分为多个条件,然后 flag/modify 基于这些条件的变量。
理想情况下,我希望最终的数据帧与原始数据帧大小相同,只是带有必要的模组。
示例如下:
data = [
[1, "soda", "LB", 1, "L", 20],
[2, "juice", "KG", 1, "GA", 12],
[3, "water", "LB", 1, "L", 35],
[4, "soda", "G", 1, "M2", 11],
]
df = pd.DataFrame(
data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)
drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
"water": {'LB/L': 1,}}
sdf = spark.createDataFrame(df)
for drink in ["soda", "juice", "water"]:
for mass_unit in drink_dictionary[drink].keys():
weight_unit = mass_unit.split("/")[0]
volume_unit = mass_unit.split("/")[1]
value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
# create condition, i.e specific weight and volumes,
# IF this condition is met, modify a different variable.
# Loop to next condition under same Beverage type.
# Modify accordingly.
condition = (spark_fns.col("Weight") == weight_unit) & (
spark_fns.col("Volume") == volume_unit
)
new_sdf = (
sdf.filter(spark_fns.col("Beverage") == drink)
.withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
.withColumn(
"corrected_amount",
spark_fns.when(
(condition),
spark_fns.expr(f"Amount / Sample * {value}"),
).otherwise(sdf["Amount"]),
)
)
此输出不正确。我希望输出看起来像下面的样子(这将涉及遍历所有饮料):
ID Beverage Weight Sample Volume Amount Corrected_Amount
1 soda LB 1 L 20 200
2 juice KG 1 GA 12 6000
3 water LB 1 L 35 35
4 soda G 1 M2 11 2200
这是一种解决方法。我将 drink_dictionary
更改为数据框,以便它可以与数据框中的其他信息一起使用。
from pyspark.sql import functions as F
df_drink_unit = (spark.createDataFrame([drink_dictionary],
schema=MapType(StringType(), MapType(StringType(), StringType())))
.select(F.explode('value').alias('Beverage', 'unit_list')))
这会将数据框创建为
+--------+--------------------+
|Beverage| unit_list|
+--------+--------------------+
| juice|[LB/L -> 90, KG/G...|
| soda|[LB/L -> 100, G/M...|
| water| [LB/L -> 1]|
+--------+--------------------+
然后,将其加入主数据框,使用getItem
从unit_list
中找到相应的值并计算Corrected_Amount
。
sdf = (sdf.join(df_drink_unit, on='Beverage', how='left')
.withColumn('unit', F.concat(F.col('Weight'), F.lit('/'), F.col('Volume')))
.withColumn('unit_value', F.col('unit_list').getItem(F.col('unit')))
.withColumn('Corrected_Amount', F.col('Amount') / F.col('Sample') * F.col('unit_value')))
sdf.select('Beverage', 'ID', 'Corrected_Amount').show()
+--------+---+----------------+
|Beverage| ID|Corrected_Amount|
+--------+---+----------------+
| juice| 2| 6000.0|
| water| 3| 35.0|
| soda| 1| 2000.0|
| soda| 4| 2200.0|
+--------+---+----------------+
我正在尝试修改一个 Spark 数据框,以便根据一个过滤值,我们将子集划分为多个条件,然后 flag/modify 基于这些条件的变量。
理想情况下,我希望最终的数据帧与原始数据帧大小相同,只是带有必要的模组。
示例如下:
data = [
[1, "soda", "LB", 1, "L", 20],
[2, "juice", "KG", 1, "GA", 12],
[3, "water", "LB", 1, "L", 35],
[4, "soda", "G", 1, "M2", 11],
]
df = pd.DataFrame(
data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)
drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
"water": {'LB/L': 1,}}
sdf = spark.createDataFrame(df)
for drink in ["soda", "juice", "water"]:
for mass_unit in drink_dictionary[drink].keys():
weight_unit = mass_unit.split("/")[0]
volume_unit = mass_unit.split("/")[1]
value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
# create condition, i.e specific weight and volumes,
# IF this condition is met, modify a different variable.
# Loop to next condition under same Beverage type.
# Modify accordingly.
condition = (spark_fns.col("Weight") == weight_unit) & (
spark_fns.col("Volume") == volume_unit
)
new_sdf = (
sdf.filter(spark_fns.col("Beverage") == drink)
.withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
.withColumn(
"corrected_amount",
spark_fns.when(
(condition),
spark_fns.expr(f"Amount / Sample * {value}"),
).otherwise(sdf["Amount"]),
)
)
此输出不正确。我希望输出看起来像下面的样子(这将涉及遍历所有饮料):
ID Beverage Weight Sample Volume Amount Corrected_Amount
1 soda LB 1 L 20 200
2 juice KG 1 GA 12 6000
3 water LB 1 L 35 35
4 soda G 1 M2 11 2200
这是一种解决方法。我将 drink_dictionary
更改为数据框,以便它可以与数据框中的其他信息一起使用。
from pyspark.sql import functions as F
df_drink_unit = (spark.createDataFrame([drink_dictionary],
schema=MapType(StringType(), MapType(StringType(), StringType())))
.select(F.explode('value').alias('Beverage', 'unit_list')))
这会将数据框创建为
+--------+--------------------+
|Beverage| unit_list|
+--------+--------------------+
| juice|[LB/L -> 90, KG/G...|
| soda|[LB/L -> 100, G/M...|
| water| [LB/L -> 1]|
+--------+--------------------+
然后,将其加入主数据框,使用getItem
从unit_list
中找到相应的值并计算Corrected_Amount
。
sdf = (sdf.join(df_drink_unit, on='Beverage', how='left')
.withColumn('unit', F.concat(F.col('Weight'), F.lit('/'), F.col('Volume')))
.withColumn('unit_value', F.col('unit_list').getItem(F.col('unit')))
.withColumn('Corrected_Amount', F.col('Amount') / F.col('Sample') * F.col('unit_value')))
sdf.select('Beverage', 'ID', 'Corrected_Amount').show()
+--------+---+----------------+
|Beverage| ID|Corrected_Amount|
+--------+---+----------------+
| juice| 2| 6000.0|
| water| 3| 35.0|
| soda| 1| 2000.0|
| soda| 4| 2200.0|
+--------+---+----------------+