在所有 spark worker 节点上初始化 gensim 对象

Initializing gensim objects on all spark worker nodes

我创建了一个函数和一个 UDF,

    def test(string):
        path_index=SparkFiles.get("corpus_final_production.index")
        path_dictionary=SparkFiles.get('dictionary_production.gensim')
        path_corpus=SparkFiles.get("corpus_final_production")
        dictionary = corpora.Dictionary.load(path_dictionary)
        corpus = corpora.MmCorpus(path_corpus)
        tf_idf = gensim.models.TfidfModel(corpus)
        index_tmpfile = get_tmpfile(path_index)
        sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
        query_doc=word_tokenize(string.lower())
        query_doc_bow=dictionary.doc2bow(query_doc)
        query_doc_tf_idf=tf_idf[query_doc_bow]
        sum_of_sims=np.sum(sims[query_doc_tf_idf], dtype=np.float32)
        max_sims=np.amax(sims[query_doc_tf_idf])
        max_count=np.count_nonzero(sims[query_doc_tf_idf] >= max_sims-0.05)
        max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
        return max_sims_origin

   test_udf = udf(lambda x: test(x),StringType())
   df_new = garuda.withColumn('max_sim_origin', test_udf(garuda.text))

它工作正常,但如您所见,我正在对 pyspark 数据框应用按行操作。对于每一行,字典语料库和 sims 都会生成索引,每行需要将近 6 分钟。

有没有办法让我在每个工作节点上初始化字典、语料库和索引文件,而不是在 UDF 中调用它。

我是 spark 的新手,我们将不胜感激任何一点帮助

我添加了所有字典和语料库文件,因为它是使用 sc.addFile()

预生成的

所以我想通了。 gensim 的工作方式是创建分片,现在如果我将 gensim sims 对象传递给函数,该对象只有本地机器的路径(master note 地址),而不是我在每个 worker 中初始化一次,这样

def test(string):
    import os.path
    path_index=SparkFiles.get("corpus_final_production.index")
    file_exists = os.path.exists(path_index)    
    if file_exists:
        path_index=SparkFiles.get("corpus_final_production.index")
        path_dictionary=SparkFiles.get('dictionary_production.gensim')
        path_corpus=SparkFiles.get("corpus_final_production")
        dictionary = corpora.Dictionary.load(path_dictionary)
        corpus = corpora.MmCorpus(path_corpus)
        sims =  gensim.similarities.Similarity.load(path_index)
        sims.output_prefix = path_index
        sims.check_moved()
        print("Sims: "+str(len(sims)))
        tf_idf = gensim.models.TfidfModel(corpus)
        query_doc=word_tokenize(string.lower())
        query_doc_bow=dictionary.doc2bow(query_doc)
        query_doc_tf_idf=tf_idf[query_doc_bow]
        max_sims=(np.amax(sims[query_doc_tf_idf]))
        max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
    else:
        path_index=SparkFiles.get("corpus_final_production.index")
        path_dictionary=SparkFiles.get('dictionary_production.gensim')
        path_corpus=SparkFiles.get("corpus_final_production")
        dictionary = corpora.Dictionary.load(path_dictionary)
        corpus = corpora.MmCorpus(path_corpus)
        print("Dictionary: "+str(len(dictionary)))
        tf_idf = gensim.models.TfidfModel(corpus)
        print("Corpus: "+str(len(corpus)))
        index_tmpfile = get_tmpfile(path_index)
        sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
        sims.save(path_index)
        print(len(sims))
        query_doc=word_tokenize(string.lower())
        query_doc_bow=dictionary.doc2bow(query_doc)
        query_doc_tf_idf=tf_idf[query_doc_bow]
        max_sims=(np.amax(sims[query_doc_tf_idf]))
        max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
    return t.Row('max_sims_origin', 'max_sims')(max_sims_origin,float(max_sims))
    

如果路径存在,它会搜索它,但第一次它总是会在工作节点上创建本地副本