PySpark UDF 优化挑战

PySpark UDF optimization challenge

我正在尝试优化下面的代码。 1000 行数据的 when 运行 大约需要 12 分钟才能完成。我们的用例要求数据大小约为 25K - 50K 行,这将使该实现完全不可行。

import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf

inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)

test_df = df.select('uid', 'content').limit(1000).repartition(10)

# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1

def load_glove(fn):
    vector_dict = {}
    count = 0
    with open(fn) as inf:
        for line in inf:
            count += 1
            eles = line.strip().split()
            token = eles[0]
            try:
                vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
                assert len(vector_dict[token]) == 300
            except:
                print("Exception in load_glove")
                pass
    return vector_dict

# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = spacy.load('en', max_length=6000000)
  gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
  glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
  spacy_doc = nlp(text)
  doc_vec = numpy.array([0.0] * 300)
  doc_vec = numpy.float32(doc_vec)
  wordcount = 0
  for sentence_id, sentence in enumerate(spacy_doc.sents):
      for word in sentence:
          if word.text in glove_embeddings_dict:
              # Pre-convert to glove dictionary to float32 representations
              doc_vec += numpy.float32(glove_embeddings_dict[word.text])
              wordcount += 1

  # Document Vector is the average of all word vectors in the document
  doc_vec = doc_vec/(1.0 * wordcount)
  return doc_vec.tolist()

spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)

document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()

# print(pandas_document_vector_df)
pandas_document_vector_df.head()

我想知道你们是否可以帮助回答下面的问题

是否在每次迭代时都调用了 spacy.load() 和 load_glove() 方法? 有没有办法为每个工作节点准备一次 load_glove() 数据,而不是为每行数据准备一次? load_glove 方法 returns 一个字典对象,最大可达 5GB。有没有办法在主节点上准备好,然后作为参数传递给 UDF?

感谢您的建议。提前致谢!

是的,在当前的实现中,每次您的函数 运行 时都会执行所有模型加载代码,这远非最佳。没有办法将它直接从驱动程序传递到工作节点,但有一种类似的方法——在每个工作人员上初始化模型,但只初始化一次。为此,您必须使用惰性函数,该函数仅在需要实际结果时才执行 - 因此,在 worker 上。

尝试做这样的事情:

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load('en', max_length=6000000)
    SPACY_MODEL = _model
    return SPACY_MODEL

@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = get_spacy_model()
  # your further processing

我想你可以尝试将手套加载代码添加到类似的函数中。

您可以尝试在此处阅读更多相关信息:https://haridas.in/run-spacy-jobs-on-apache-spark.html(这不是我的博客,只是在尝试使用 Spacy 模型执行您需要的相同操作时发现此信息)。

udf-s 这么慢的主要原因是它们不能被 spark 优化(它们被当作黑盒子对待)。所以为了让它更快,你需要尽可能多地取出并用香草火花函数代替它。理想的做法是只将 spacy 部分(我不熟悉该模块)留在 udf 中,得到一个结果 DF,然后使用 vanilla spark 函数完成所需的其余转换。

例如 load_glove() 将按照其他答案所说的每一行执行。但是看代码,它的格式看起来可以变成一个有 301 列的数据框。然后你可以加入它以获得所需的值。 (如果你能让另一个DF有word.text作为键,没有数据就有点难说了,但理论上看起来是可以的)。