在所有 spark worker 节点上初始化 gensim 对象
Initializing gensim objects on all spark worker nodes
我创建了一个函数和一个 UDF,
def test(string):
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
tf_idf = gensim.models.TfidfModel(corpus)
index_tmpfile = get_tmpfile(path_index)
sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
sum_of_sims=np.sum(sims[query_doc_tf_idf], dtype=np.float32)
max_sims=np.amax(sims[query_doc_tf_idf])
max_count=np.count_nonzero(sims[query_doc_tf_idf] >= max_sims-0.05)
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
return max_sims_origin
test_udf = udf(lambda x: test(x),StringType())
df_new = garuda.withColumn('max_sim_origin', test_udf(garuda.text))
它工作正常,但如您所见,我正在对 pyspark 数据框应用按行操作。对于每一行,字典语料库和 sims 都会生成索引,每行需要将近 6 分钟。
有没有办法让我在每个工作节点上初始化字典、语料库和索引文件,而不是在 UDF 中调用它。
我是 spark 的新手,我们将不胜感激任何一点帮助
我添加了所有字典和语料库文件,因为它是使用 sc.addFile()
预生成的
所以我想通了。 gensim 的工作方式是创建分片,现在如果我将 gensim sims 对象传递给函数,该对象只有本地机器的路径(master note 地址),而不是我在每个 worker 中初始化一次,这样
def test(string):
import os.path
path_index=SparkFiles.get("corpus_final_production.index")
file_exists = os.path.exists(path_index)
if file_exists:
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
sims = gensim.similarities.Similarity.load(path_index)
sims.output_prefix = path_index
sims.check_moved()
print("Sims: "+str(len(sims)))
tf_idf = gensim.models.TfidfModel(corpus)
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
max_sims=(np.amax(sims[query_doc_tf_idf]))
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
else:
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
print("Dictionary: "+str(len(dictionary)))
tf_idf = gensim.models.TfidfModel(corpus)
print("Corpus: "+str(len(corpus)))
index_tmpfile = get_tmpfile(path_index)
sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
sims.save(path_index)
print(len(sims))
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
max_sims=(np.amax(sims[query_doc_tf_idf]))
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
return t.Row('max_sims_origin', 'max_sims')(max_sims_origin,float(max_sims))
如果路径存在,它会搜索它,但第一次它总是会在工作节点上创建本地副本
我创建了一个函数和一个 UDF,
def test(string):
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
tf_idf = gensim.models.TfidfModel(corpus)
index_tmpfile = get_tmpfile(path_index)
sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
sum_of_sims=np.sum(sims[query_doc_tf_idf], dtype=np.float32)
max_sims=np.amax(sims[query_doc_tf_idf])
max_count=np.count_nonzero(sims[query_doc_tf_idf] >= max_sims-0.05)
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
return max_sims_origin
test_udf = udf(lambda x: test(x),StringType())
df_new = garuda.withColumn('max_sim_origin', test_udf(garuda.text))
它工作正常,但如您所见,我正在对 pyspark 数据框应用按行操作。对于每一行,字典语料库和 sims 都会生成索引,每行需要将近 6 分钟。
有没有办法让我在每个工作节点上初始化字典、语料库和索引文件,而不是在 UDF 中调用它。
我是 spark 的新手,我们将不胜感激任何一点帮助
我添加了所有字典和语料库文件,因为它是使用 sc.addFile()
预生成的所以我想通了。 gensim 的工作方式是创建分片,现在如果我将 gensim sims 对象传递给函数,该对象只有本地机器的路径(master note 地址),而不是我在每个 worker 中初始化一次,这样
def test(string):
import os.path
path_index=SparkFiles.get("corpus_final_production.index")
file_exists = os.path.exists(path_index)
if file_exists:
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
sims = gensim.similarities.Similarity.load(path_index)
sims.output_prefix = path_index
sims.check_moved()
print("Sims: "+str(len(sims)))
tf_idf = gensim.models.TfidfModel(corpus)
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
max_sims=(np.amax(sims[query_doc_tf_idf]))
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
else:
path_index=SparkFiles.get("corpus_final_production.index")
path_dictionary=SparkFiles.get('dictionary_production.gensim')
path_corpus=SparkFiles.get("corpus_final_production")
dictionary = corpora.Dictionary.load(path_dictionary)
corpus = corpora.MmCorpus(path_corpus)
print("Dictionary: "+str(len(dictionary)))
tf_idf = gensim.models.TfidfModel(corpus)
print("Corpus: "+str(len(corpus)))
index_tmpfile = get_tmpfile(path_index)
sims = gensim.similarities.Similarity(index_tmpfile,tf_idf[corpus],num_features=len(dictionary))
sims.save(path_index)
print(len(sims))
query_doc=word_tokenize(string.lower())
query_doc_bow=dictionary.doc2bow(query_doc)
query_doc_tf_idf=tf_idf[query_doc_bow]
max_sims=(np.amax(sims[query_doc_tf_idf]))
max_sims_origin=file_docs[np.argmax(sims[query_doc_tf_idf])]
return t.Row('max_sims_origin', 'max_sims')(max_sims_origin,float(max_sims))
如果路径存在,它会搜索它,但第一次它总是会在工作节点上创建本地副本