将 PySpark Dataframe 导出到 Azure Data Lake 需要很长时间

Exporting PySpark Dataframe to Azure Data Lake Takes Forever

下面的代码 运行 在 Mac OS (Python 3.7) 的独立版本 PySpark 2.4 上运行得很好当输入数据的大小(大约6 GB)很小。但是,当我 运行 HDInsight 集群上的代码时(HDI 4.0,即 Python 3.5,PySpark 2.4,4 个工作节点,每个节点有 64 个内核和 432 GB RAM,2 个头节点,每个节点有 4 个核心和 28 GB RAM,第二代数据湖)具有更大的输入数据(169 GB),最后一步,即将数据写入数据湖,花了很长时间(我在执行 24 小时后将其杀死)完成.鉴于 HDInsight 在云计算社区并不流行,我只能参考抱怨将数据帧写入 S3 时速度低的帖子。有人建议对数据集重新分区,我照做了,但没有帮助。

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType
from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc
from pyspark.ml.fpm import FPGrowth
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"

def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1):
    print("Creating Spark Environment...")
    spark = SparkSession.builder.appName("Menu").getOrCreate()
    print("Spark Environment Created!")
    print("Working on Checkpoint1...")
    orders = spark.read.csv(order_path)
    orders.createOrReplaceTempView("orders")
    orders = spark.sql(
        "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders"
    )
    orders.createOrReplaceTempView("orders")
    beer = spark.read.csv(
        beer_path,
        header=True
    )
    beer.createOrReplaceTempView("beer")
    beer = spark.sql(
        """
        SELECT 
            order_id AS beer_order_id,
            in_menu_id AS beer_in_menu_id,
            '-999' AS beer_in_menu_name
        FROM beer
        """
    )
    beer.createOrReplaceTempView("beer")
    orders = spark.sql(
        """
        WITH orders_beer AS (
            SELECT *
            FROM orders
            LEFT JOIN beer
            ON orders.in_menu_id = beer.beer_in_menu_id
        )
        SELECT
            order_id,
            in_menu_id,
            CASE
                WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name
                WHEN beer_in_menu_name IS NULL THEN in_menu_name
            END AS menu_name
        FROM orders_beer
        """
    )
    print("Checkpoint1 Completed!")
    print("Working on Checkpoint2...")
    corpus = spark.read.csv(
        corpus_path,
        header=True
    )
    keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect()
    orders = orders.withColumn(
        "keyword", 
        regexp_extract(
            "menu_name", 
            "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 
            0
        )
    )
    orders.createOrReplaceTempView("orders")
    orders = spark.sql("""
        SELECT order_id, in_menu_id, keyword
        FROM orders
        WHERE keyword != ''
    """)
    orders.createOrReplaceTempView("orders")
    orders = orders.groupBy("order_id").agg(
        collect_set("keyword").alias("items")
    )
    print("Checkpoint2 Completed!")
    print("Working on Checkpoint3...")
    fpGrowth = FPGrowth(
        itemsCol="items", 
        minSupport=0, 
        minConfidence=0
    )
    model = fpGrowth.fit(orders)
    print("Checkpoint3 Completed!")
    print("Working on Checkpoint4...")
    frequency = model.freqItemsets
    frequency = frequency.filter(col("freq") > FREQ_THRESHOLD)
    frequency = frequency.withColumn(
        "items", 
        array_remove("items", "-999")
    )
    frequency = frequency.filter(size(col("items")) > 0)
    frequency = frequency.orderBy(asc("items"), desc("freq"))
    frequency = frequency.dropDuplicates(["items"])
    frequency = frequency.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(frequency.items)
    )
    frequency.createOrReplaceTempView("frequency")
    lift = model.associationRules
    lift = lift.drop("confidence")
    lift = lift.filter(col("lift") > LIFT_THRESHOLD)
    lift = lift.filter(
        udf(
            lambda x: x == ["-999"], BooleanType()
        )(lift.consequent)
    )
    lift = lift.drop("consequent")
    lift = lift.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(lift.antecedent)
    )
    lift.createOrReplaceTempView("lift")
    result = spark.sql(
        """
        SELECT lift.antecedent, freq AS frequency, lift
        FROM lift
        INNER JOIN frequency
        ON lift.antecedent = frequency.antecedent
        """
    )
    print("Checkpoint4 Completed!")
    print("Writing Result to Data Lake...")
    result.repartition(1024).write.mode("overwrite").parquet(output_path)
    print("All Done!")

def main():
    work(
        order_path=169.1 GB of txt,
        beer_path=4.9 GB of csv,
        corpus_path=210 KB of csv,
        output_path="final_result.parquet"
    )

if __name__ == "__main__":
    main()

我首先认为这是由文件格式 parquet 引起的。然而,当我尝试 csv 时,我遇到了同样的问题。我尝试 result.count() 查看 table result 有多少行。获取行号花了很长时间,就像将数据写入数据湖一样。 如果大型数据集与小型数据集连接,建议使用 broadcast hash join 而不是默认的 sort-merge join。我认为值得尝试,因为试点研究中的较小样本告诉我 frequency 的行数大约是 lift 行数的 0.09%(如果您难以跟踪 [=15,请参阅下面的查询=] 和 lift).

SELECT lift.antecedent, freq AS frequency, lift
FROM lift
INNER JOIN frequency
ON lift.antecedent = frequency.antecedent

考虑到这一点,我修改了我的代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType
from pyspark.sql.functions import udf, regexp_extract, collect_set, array_remove, col, size, asc, desc
from pyspark.ml.fpm import FPGrowth
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"

def work(order_path, beer_path, corpus_path, output_path, FREQ_THRESHOLD=1000, LIFT_THRESHOLD=1):
    print("Creating Spark Environment...")
    spark = SparkSession.builder.appName("Menu").getOrCreate()
    print("Spark Environment Created!")
    print("Working on Checkpoint1...")
    orders = spark.read.csv(order_path)
    orders.createOrReplaceTempView("orders")
    orders = spark.sql(
        "SELECT _c14 AS order_id, _c31 AS in_menu_id, _c32 AS in_menu_name FROM orders"
    )
    orders.createOrReplaceTempView("orders")
    beer = spark.read.csv(
        beer_path,
        header=True
    )
    beer.createOrReplaceTempView("beer")
    beer = spark.sql(
        """
        SELECT 
            order_id AS beer_order_id,
            in_menu_id AS beer_in_menu_id,
            '-999' AS beer_in_menu_name
        FROM beer
        """
    )
    beer.createOrReplaceTempView("beer")
    orders = spark.sql(
        """
        WITH orders_beer AS (
            SELECT *
            FROM orders
            LEFT JOIN beer
            ON orders.in_menu_id = beer.beer_in_menu_id
        )
        SELECT
            order_id,
            in_menu_id,
            CASE
                WHEN beer_in_menu_name IS NOT NULL THEN beer_in_menu_name
                WHEN beer_in_menu_name IS NULL THEN in_menu_name
            END AS menu_name
        FROM orders_beer
        """
    )
    print("Checkpoint1 Completed!")
    print("Working on Checkpoint2...")
    corpus = spark.read.csv(
        corpus_path,
        header=True
    )
    keywords = corpus.select("Food_Name").rdd.flatMap(lambda x: x).collect()
    orders = orders.withColumn(
        "keyword", 
        regexp_extract(
            "menu_name", 
            "(?=^|\s)(" + "|".join(keywords) + ")(?=\s|$)", 
            0
        )
    )
    orders.createOrReplaceTempView("orders")
    orders = spark.sql("""
        SELECT order_id, in_menu_id, keyword
        FROM orders
        WHERE keyword != ''
    """)
    orders.createOrReplaceTempView("orders")
    orders = orders.groupBy("order_id").agg(
        collect_set("keyword").alias("items")
    )
    print("Checkpoint2 Completed!")
    print("Working on Checkpoint3...")
    fpGrowth = FPGrowth(
        itemsCol="items", 
        minSupport=0, 
        minConfidence=0
    )
    model = fpGrowth.fit(orders)
    print("Checkpoint3 Completed!")
    print("Working on Checkpoint4...")
    frequency = model.freqItemsets
    frequency = frequency.filter(col("freq") > FREQ_THRESHOLD)
    frequency = frequency.withColumn(
        "antecedent", 
        array_remove("items", "-999")
    )
    frequency = frequency.drop("items")
    frequency = frequency.filter(size(col("antecedent")) > 0)
    frequency = frequency.orderBy(asc("antecedent"), desc("freq"))
    frequency = frequency.dropDuplicates(["antecedent"])
    frequency = frequency.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(frequency.antecedent)
    )
    lift = model.associationRules
    lift = lift.drop("confidence")
    lift = lift.filter(col("lift") > LIFT_THRESHOLD)
    lift = lift.filter(
        udf(
            lambda x: x == ["-999"], BooleanType()
        )(lift.consequent)
    )
    lift = lift.drop("consequent")
    lift = lift.withColumn(
        "antecedent", 
        udf(
            lambda x: "|".join(sorted(x)), StringType()
        )(lift.antecedent)
    )
    result = lift.join(
        frequency.hint("broadcast"), 
        ["antecedent"], 
        "inner"
    )
    print("Checkpoint4 Completed!")
    print("Writing Result to Data Lake...")
    result.repartition(1024).write.mode("overwrite").parquet(output_path)
    print("All Done!")

def main():
    work(
        order_path=169.1 GB of txt,
        beer_path=4.9 GB of csv,
        corpus_path=210 KB of csv,
        output_path="final_result.parquet"
    )

if __name__ == "__main__":
    main()

代码 运行 与我的 Mac OS 上的相同样本数据完全吻合,而且正如预期的那样花费了更少的时间(34 秒对 26 秒)。然后我决定 运行 将代码添加到具有完整数据集的 HDInsight。在将数据写入数据湖的最后一步中,任务失败了,我被告知 Job canceled because SparkContext was shut down。我对大数据很陌生,对这种方式一无所知。互联网上的帖子说这背后可能有很多原因。无论我应该使用哪种方法,如何优化我的代码以便在可承受的时间内在数据湖中获得所需的输出?

我会尝试几种方法,按它们需要的能量排序:

  • 检查 ADL 存储是否与您的 HDInsight 群集位于同一区域。
  • 在繁重的计算之后添加对 df = df.cache() 的调用,或者甚至在这些计算之间将数据帧写入缓存存储并从中读取数据帧。
  • 用 "native" Spark 代码替换您的 UDF,因为 UDF 是 performance bad practices of Spark.
  • 之一

我折腾了五天搞定了。以下是我用来优化代码的方法。代码执行时间从 24 多小时减少到 10 分钟左右。代码优化真的很重要

  1. 正如下面的 David Taub 指出的那样,在大量计算之后或将数据输入模型之前使用 df.cache()。我使用 df.cache().count() 因为调用 .cache() 是延迟评估的,但下面的 .count() 强制对整个数据集进行评估。
  2. 使用flashtext代替正则表达式提取关键字。这大大提高了代码性能。
  3. 小心加入/合并。由于数据偏斜,它可能会变得非常慢。始终考虑避免不必要连接的方法。
  4. FPGrowth 设置 minSupport。这显着减少了调用 model.freqItemsets.
  5. 的时间