如何在不重新分区的情况下并行执行 Spark UDF

How to execute Spark UDF in parallel without repartitioning

我有一个小型 Hive Table,通过 HDFS 保存了 1500 万行(parquet/1152 个文件 - 超过 30GB)。

我正在对科学摘要进行 LDA。因此,第一步是使用 StanfordNLP 提取一些名词 phrases/chunk 短语,我写了一个 UDF 来实现这个目标。

现在在性能方面,有两种情况,每种情况都有非常不同的结果。

场景一:

val hiveTable = hivecontext.sql("""
SELECT ab AS text,
          pmid AS id
  FROM scientific.medline      
    LIMIT 15000000
""")

然后我通过 hiveTable:

调用我的 UDF
val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text")))

现在,如果我触发任何 action/transformation,例如 .count() 或在“postagsDF”上执行 CountVectorizer(),我会看到 2 个阶段。一个具有适当数量的任务(分区),另一个阶段只有一个任务。第一个在完成一些 Input/Shuffle 写入后很快结束,但第二个只有一个任务需要很长时间。看来我的 UDF 正在这个只有一个任务的阶段执行。 (需要数小时,完成期间没有资源 activity)

场景二:

val hiveTable = hivecontext.sql("""
SELECT ab AS text,
          pmid AS id
  FROM scientific.medline      
    LIMIT 15000000
""")

我将 DataFrame 重新分区为 spark 根据镶木地板的数量检测到的确切分区数。 (我可以选择任何其他数字,但这个数字似乎还可以,因为我有超过 500 个核心可用 - 每个核心 2 个任务)

val repartitionedDocDF = docDF.repartition(1152)

现在通过 hiveTable:

调用我的 UDF
val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text")))

但是,任何action/transformation这次都会是四个阶段。其中两个阶段(比方说计数)是 1152 个任务,其中两个是单一任务。我可以看到我的 UDF 正在其中一个阶段执行,所有执行程序都正确使用了我的整个集群,执行了 1152 个任务。

场景编号 1 的结果: 查看我的集群,在 long-运行ning 单任务阶段没有发生太多事情。没有cpu使用,没有内存,没有网络,没有IOactivity。只有一个执行者执行一项任务,在每个 document/column.

上应用我的 UDF

基准: 场景 1 需要 3-4 小时才能完成仅 100 万行。 (我迫不及待地想看看 1500 万行需要多少)

场景 2 的结果: 查看我的集群,我可以清楚地看到我所有的资源都被利用了。我所有的节点都快满负荷了。

基准: 场景 2 处理 1500 万行需要 30 多分钟。

真题

  1. 刚刚发生了什么?我认为默认情况下,Dataframe 上的 UDF 会 运行 并行。如果 partions/tasks 的数量多于或少于内核总数,可能会重新分区,但至少与默认的 200 partitions/tasks 平行。我只是想了解为什么我的 UDf 是单一任务并忽略默认的 200 和实际分区大小。 (这不仅仅是关于性能,它是单任务作业 vs 多任务作业)

  2. 是否有任何其他方法可以使 UDF 在所有执行程序上并行执行而无需调用重新分区。我不反对重新分区,但这是非常昂贵的操作,我认为它不应该是并行制作 UDF 运行s 的唯一方法。即使当我重新分区到完全相同数量的 partitions/files 时,我仍然需要观察超过 20GB 的随机读取和写入在我的集群上飞过。

我已经阅读了有关重新分区和 UDF 的所有内容,但我找不到类似的问题,即默认情况下无法 运行 UDF 并行,除非它重新分区。 (当你将一个列的类型从 int 转换为 bigint 时,简单的 UDF 可能不可见,但当你进行 NLP 时,它确实是可见的)

我的集群大小:30 个节点 (16core/32G) - Spark 1.6 Cloudera CDH 5.11.1 火花:--driver-cores 5 --driver-memory 8g --executor-cores 5 --executor-memory 5g --num-executors 116

非常感谢,

更新

我 运行 没有 LIMIT 子句的相同代码,它在 18 分钟内完成了!所以 LIMIT 是原因(答案中有更多内容):

这里的问题与您在查询中使用的 LIMIT 子句具体相关,与 UDF 无关。 LIMIT子句将所有结果数据重新分区到一个分区,因此不适合大样本。

如果您想避免该问题并以某种方式减少记录数,最好先对您的数据进行采样:

val p: Double = ???

spark.sql(s"SELECT * FROM df TABLESAMPLE($p percent)")

或:

spark.table("df").sample(false, p)

其中 p 是所需的记录部分。

请记住,使用准确数量的值进行抽样会遇到与 LIMIT 子句相同的问题。