如何使用 for 循环和 case-when 条件语句修改 Pyspark 数据框?

How to modify a Pyspark dataframe using a for loop and case-when conditionals?

我正在尝试修改一个 Spark 数据框,以便根据一个过滤值,我们将子集划分为多个条件,然后 flag/modify 基于这些条件的变量。

理想情况下,我希望最终的数据帧与原始数据帧大小相同,只是带有必要的模组。

示例如下:

data = [
    [1, "soda", "LB", 1, "L", 20],
    [2, "juice", "KG", 1, "GA", 12],
    [3, "water", "LB", 1, "L", 35],
    [4, "soda", "G", 1, "M2", 11],
]

df = pd.DataFrame(
    data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)

drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
                 "water": {'LB/L': 1,}}

sdf = spark.createDataFrame(df)

for drink in ["soda", "juice", "water"]:
    for mass_unit in drink_dictionary[drink].keys():
        weight_unit = mass_unit.split("/")[0]
        volume_unit = mass_unit.split("/")[1]
        value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
        
        # create condition, i.e specific weight and volumes,
        # IF this condition is met, modify a different variable.
        # Loop to next condition under same Beverage type.
        # Modify accordingly.

        condition = (spark_fns.col("Weight") == weight_unit) & (
            spark_fns.col("Volume") == volume_unit
        )

        new_sdf = (
            sdf.filter(spark_fns.col("Beverage") == drink)
            .withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
            .withColumn(
                "corrected_amount",
                spark_fns.when(
                    (condition),
                    spark_fns.expr(f"Amount / Sample * {value}"),
                ).otherwise(sdf["Amount"]),
            )
        )

此输出正确。我希望输出看起来像下面的样子(这将涉及遍历所有饮料):

ID   Beverage  Weight  Sample  Volume  Amount   Corrected_Amount
1     soda       LB       1       L      20            200
2     juice      KG       1       GA     12            6000
3     water      LB       1       L      35            35
4     soda       G        1       M2     11            2200 

这是一种解决方法。我将 drink_dictionary 更改为数据框,以便它可以与数据框中的其他信息一起使用。

from pyspark.sql import functions as F
df_drink_unit = (spark.createDataFrame([drink_dictionary], 
                                      schema=MapType(StringType(), MapType(StringType(), StringType())))
                 .select(F.explode('value').alias('Beverage', 'unit_list')))

这会将数据框创建为

+--------+--------------------+
|Beverage|           unit_list|
+--------+--------------------+
|   juice|[LB/L -> 90, KG/G...|
|    soda|[LB/L -> 100, G/M...|
|   water|         [LB/L -> 1]|
+--------+--------------------+

然后,将其加入主数据框,使用getItemunit_list中找到相应的值并计算Corrected_Amount

sdf = (sdf.join(df_drink_unit, on='Beverage', how='left')
      .withColumn('unit', F.concat(F.col('Weight'), F.lit('/'), F.col('Volume')))
      .withColumn('unit_value', F.col('unit_list').getItem(F.col('unit')))
      .withColumn('Corrected_Amount', F.col('Amount') / F.col('Sample') * F.col('unit_value')))
sdf.select('Beverage', 'ID', 'Corrected_Amount').show()

+--------+---+----------------+
|Beverage| ID|Corrected_Amount|
+--------+---+----------------+
|   juice|  2|          6000.0|
|   water|  3|            35.0|
|    soda|  1|          2000.0|
|    soda|  4|          2200.0|
+--------+---+----------------+