在 Spark 中广播 Annoy 对象(对于最近的邻居)?
Broadcast Annoy object in Spark (for nearest neighbors)?
由于 Spark 的 mllib 没有最近邻功能,我尝试使用 Annoy 来获得近似最近邻。我尝试广播 Annoy 对象并将其传递给工人;但是,它没有按预期运行。
下面是再现性代码(在 PySpark 中是 运行)。将 Annoy 与 Spark 结合使用与不结合使用时所看到的差异突出显示了该问题。
from annoy import AnnoyIndex
import random
random.seed(42)
f = 40
t = AnnoyIndex(f) # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
v = [random.gauss(0, 1) for z in xrange(f)]
t.add_item(i, v)
allvectors.append((i, v))
t.build(10) # 10 trees
# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()
# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))
看到的输出:
Five closest neighbors for first vector with Spark: None
Five closest neighbors for first vector without Spark: [0, 13, 12, 6, 4]
我从未使用过 Annoy,但我很确定软件包描述解释了这里发生的事情:
It also creates large read-only file-based data structures that are mmapped into memory so that many processes may share the same data.
由于在序列化并将其传递给工作人员时它使用内存映射索引,因此所有数据都会在途中丢失。
试试这样的方法:
from pyspark import SparkFiles
t.save("index.ann")
sc.addPyFile("index.ann")
def find_neighbors(iter):
t = AnnoyIndex(f)
t.load(SparkFiles.get("index.ann"))
return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter)
sparkvectors.mapPartitions(find_neighbors).first()
## [0, 13, 12, 6, 4]
以防其他人像我一样跟随这里,您需要在 mapPartitions
函数中导入 Annoy,否则您仍然会遇到 pickling 错误。这是我基于上述内容完成的示例:
from annoy import AnnoyIndex
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark import SparkConf
import random
random.seed(42)
f = 1024
t = AnnoyIndex(f)
allvectors = []
for i in range(100):
v = [random.gauss(0, 1) for z in range(f)]
t.add_item(i, v)
allvectors.append((i, v))
t.build(10)
t.save("index.ann")
def find_neighbors(i):
from annoy import AnnoyIndex
ai = AnnoyIndex(f)
ai.load(SparkFiles.get("index.ann"))
return (ai.get_nns_by_vector(vector=x[1], n=5) for x in i)
with SparkContext(conf=SparkConf().setAppName("myannoy")) as sc:
sc.addFile("index.ann")
sparkvectors = sc.parallelize(allvectors)
sparkvectors.mapPartitions(find_neighbors).first()
由于 Spark 的 mllib 没有最近邻功能,我尝试使用 Annoy 来获得近似最近邻。我尝试广播 Annoy 对象并将其传递给工人;但是,它没有按预期运行。
下面是再现性代码(在 PySpark 中是 运行)。将 Annoy 与 Spark 结合使用与不结合使用时所看到的差异突出显示了该问题。
from annoy import AnnoyIndex
import random
random.seed(42)
f = 40
t = AnnoyIndex(f) # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
v = [random.gauss(0, 1) for z in xrange(f)]
t.add_item(i, v)
allvectors.append((i, v))
t.build(10) # 10 trees
# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()
# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))
看到的输出:
Five closest neighbors for first vector with Spark: None
Five closest neighbors for first vector without Spark: [0, 13, 12, 6, 4]
我从未使用过 Annoy,但我很确定软件包描述解释了这里发生的事情:
It also creates large read-only file-based data structures that are mmapped into memory so that many processes may share the same data.
由于在序列化并将其传递给工作人员时它使用内存映射索引,因此所有数据都会在途中丢失。
试试这样的方法:
from pyspark import SparkFiles
t.save("index.ann")
sc.addPyFile("index.ann")
def find_neighbors(iter):
t = AnnoyIndex(f)
t.load(SparkFiles.get("index.ann"))
return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter)
sparkvectors.mapPartitions(find_neighbors).first()
## [0, 13, 12, 6, 4]
以防其他人像我一样跟随这里,您需要在 mapPartitions
函数中导入 Annoy,否则您仍然会遇到 pickling 错误。这是我基于上述内容完成的示例:
from annoy import AnnoyIndex
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark import SparkConf
import random
random.seed(42)
f = 1024
t = AnnoyIndex(f)
allvectors = []
for i in range(100):
v = [random.gauss(0, 1) for z in range(f)]
t.add_item(i, v)
allvectors.append((i, v))
t.build(10)
t.save("index.ann")
def find_neighbors(i):
from annoy import AnnoyIndex
ai = AnnoyIndex(f)
ai.load(SparkFiles.get("index.ann"))
return (ai.get_nns_by_vector(vector=x[1], n=5) for x in i)
with SparkContext(conf=SparkConf().setAppName("myannoy")) as sc:
sc.addFile("index.ann")
sparkvectors = sc.parallelize(allvectors)
sparkvectors.mapPartitions(find_neighbors).first()