使用 Word2VecModel.transform() 在 map 函数中不起作用
using Word2VecModel.transform() does not work in map function
我已经使用 Spark 构建了一个 Word2Vec 模型并将其保存为模型。现在,我想在另一个代码中使用它作为离线模型。我已经加载了模型并用它来呈现单词的向量(例如你好)并且效果很好。但是,我需要在一个 RDD 中使用 map 来调用它。
当我在 map 函数中调用 model.transform() 时,它会抛出此错误:
"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
代码:
from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel
sc = SparkContext('local[4]',appName='Word2Vec')
model=Word2VecModel.load(sc, "word2vecModel")
x= model.transform("Hello")
print(x[0]) # it works fine and returns [0.234, 0.800,....]
y=sc.parallelize([['Hello'],['test']])
y.map(lambda w: model.transform(w[0])).collect() #it throws the error
非常感谢您的帮助。
这是预期的行为。与其他 MLlib
模型一样,Python 对象只是 Scala 模型的包装器,实际处理委托给其对应的 JVM。由于工作人员无法访问 Py4J 网关(请参阅 ),因此您无法从操作或转换中调用 Java / Scala 方法。
通常 MLlib 模型提供了一个可以直接在 RDD 上工作的辅助方法,但这里不是这种情况。 Word2VecModel
提供了 getVectors
方法,该方法 returns 从单词到向量的映射,但不幸的是它是 JavaMap
所以它不能在转换中工作。你可以尝试这样的事情:
from pyspark.mllib.linalg import DenseVector
vectors_ = model.getVectors() # py4j.java_collections.JavaMap
vectors = {k: DenseVector([x for x in vectors_.get(k)])
for k in vectors_.keys()}
获取Python字典,但会非常慢。另一种选择是以 Python 可以使用的形式将此对象转储到磁盘,但这需要对 Py4J 进行一些修改,最好避免这种情况。相反,让我们将模型读取为 DataFrame:
lookup = sqlContext.read.parquet("path_to_word2vec_model/data").alias("lookup")
我们将得到以下结构:
lookup.printSchema()
## root
## |-- word: string (nullable = true)
## |-- vector: array (nullable = true)
## | |-- element: float (containsNull = true)
可用于将单词映射到向量,例如通过 join
:
from pyspark.sql.functions import col
words = sc.parallelize([('hello', ), ('test', )]).toDF(["word"]).alias("words")
words.join(lookup, col("words.word") == col("lookup.word"))
## +-----+-----+--------------------+
## | word| word| vector|
## +-----+-----+--------------------+
## |hello|hello|[-0.030862354, -0...|
## | test| test|[-0.13154022, 0.2...|
## +-----+-----+--------------------+
如果数据适合驱动程序/工作程序内存,您可以尝试使用广播收集和映射:
lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
rdd = sc.parallelize([['Hello'],['test']])
rdd.map(lambda ws: [lookup_bd.value.get(w) for w in ws])
我已经使用 Spark 构建了一个 Word2Vec 模型并将其保存为模型。现在,我想在另一个代码中使用它作为离线模型。我已经加载了模型并用它来呈现单词的向量(例如你好)并且效果很好。但是,我需要在一个 RDD 中使用 map 来调用它。
当我在 map 函数中调用 model.transform() 时,它会抛出此错误:
"It appears that you are attempting to reference SparkContext from a broadcast " Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
代码:
from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel
sc = SparkContext('local[4]',appName='Word2Vec')
model=Word2VecModel.load(sc, "word2vecModel")
x= model.transform("Hello")
print(x[0]) # it works fine and returns [0.234, 0.800,....]
y=sc.parallelize([['Hello'],['test']])
y.map(lambda w: model.transform(w[0])).collect() #it throws the error
非常感谢您的帮助。
这是预期的行为。与其他 MLlib
模型一样,Python 对象只是 Scala 模型的包装器,实际处理委托给其对应的 JVM。由于工作人员无法访问 Py4J 网关(请参阅
通常 MLlib 模型提供了一个可以直接在 RDD 上工作的辅助方法,但这里不是这种情况。 Word2VecModel
提供了 getVectors
方法,该方法 returns 从单词到向量的映射,但不幸的是它是 JavaMap
所以它不能在转换中工作。你可以尝试这样的事情:
from pyspark.mllib.linalg import DenseVector
vectors_ = model.getVectors() # py4j.java_collections.JavaMap
vectors = {k: DenseVector([x for x in vectors_.get(k)])
for k in vectors_.keys()}
获取Python字典,但会非常慢。另一种选择是以 Python 可以使用的形式将此对象转储到磁盘,但这需要对 Py4J 进行一些修改,最好避免这种情况。相反,让我们将模型读取为 DataFrame:
lookup = sqlContext.read.parquet("path_to_word2vec_model/data").alias("lookup")
我们将得到以下结构:
lookup.printSchema()
## root
## |-- word: string (nullable = true)
## |-- vector: array (nullable = true)
## | |-- element: float (containsNull = true)
可用于将单词映射到向量,例如通过 join
:
from pyspark.sql.functions import col
words = sc.parallelize([('hello', ), ('test', )]).toDF(["word"]).alias("words")
words.join(lookup, col("words.word") == col("lookup.word"))
## +-----+-----+--------------------+
## | word| word| vector|
## +-----+-----+--------------------+
## |hello|hello|[-0.030862354, -0...|
## | test| test|[-0.13154022, 0.2...|
## +-----+-----+--------------------+
如果数据适合驱动程序/工作程序内存,您可以尝试使用广播收集和映射:
lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
rdd = sc.parallelize([['Hello'],['test']])
rdd.map(lambda ws: [lookup_bd.value.get(w) for w in ws])