Elasticsearch analyze() 与 Python 中的 Spark 不兼容?
Elasticsearch analyze() not compatible with Spark in Python?
我在 PySpark 中使用 elasticsearch-py 客户端,使用 Python 3 并且我 运行 遇到了将 analyze() 函数与 ES 结合 RDD 使用的问题。特别是,我的 RDD 中的每条记录都是一串文本,我正在尝试分析它以获取令牌信息,但在 Spark 的映射函数中尝试使用它时出现错误。
例如,这工作得很好:
from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]
{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}
但是,当我尝试这样做时:
trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()
我收到一条与酸洗相关的非常非常长的错误消息(到此结束):
(self, obj) 109if'recursion'in.[0]: 110="""Could not pickle object as excessively deep recursion required."""--> 111 picklePicklingErrormsg
save_memoryviewself obj
: Could not pickle object as excessively deep recursion required.
raise.() 112 113def(,):PicklingError
我不确定这个错误是什么意思。难道我做错了什么?有没有办法将 ES 分析功能映射到 RDD 的记录?
编辑:我在应用来自 elasticsearch-py 的其他函数时也遇到了这种行为(例如,es.termvector())。
本质上,Elasticsearch
客户端是不可序列化的。所以你需要做的是为每个分区创建一个客户端实例,并处理它们:
def get_tokens(part):
es = Elasticsearch()
yield [es.indices.analyze(text=x)['tokens'][0] for x in part]
rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2)
rdd.mapPartitions(lambda p: get_tokens(p)).collect()
应该给出以下结果:
Out[17]:
[[{u'end_offset': 3,
u'position': 1,
u'start_offset': 0,
u'token': u'the',
u'type': u'<ALPHANUM>'}],
[{u'end_offset': 5,
u'position': 1,
u'start_offset': 0,
u'token': u'brown',
u'type': u'<ALPHANUM>'}]]
请注意,对于大型数据集,这将非常低效,因为它涉及对数据集中每个元素的 ES 的 REST 调用。
我在 PySpark 中使用 elasticsearch-py 客户端,使用 Python 3 并且我 运行 遇到了将 analyze() 函数与 ES 结合 RDD 使用的问题。特别是,我的 RDD 中的每条记录都是一串文本,我正在尝试分析它以获取令牌信息,但在 Spark 的映射函数中尝试使用它时出现错误。
例如,这工作得很好:
from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]
{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}
但是,当我尝试这样做时:
trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()
我收到一条与酸洗相关的非常非常长的错误消息(到此结束):
(self, obj) 109if'recursion'in.[0]: 110="""Could not pickle object as excessively deep recursion required."""--> 111 picklePicklingErrormsg
save_memoryviewself obj
: Could not pickle object as excessively deep recursion required.
raise.() 112 113def(,):PicklingError
我不确定这个错误是什么意思。难道我做错了什么?有没有办法将 ES 分析功能映射到 RDD 的记录?
编辑:我在应用来自 elasticsearch-py 的其他函数时也遇到了这种行为(例如,es.termvector())。
本质上,Elasticsearch
客户端是不可序列化的。所以你需要做的是为每个分区创建一个客户端实例,并处理它们:
def get_tokens(part):
es = Elasticsearch()
yield [es.indices.analyze(text=x)['tokens'][0] for x in part]
rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2)
rdd.mapPartitions(lambda p: get_tokens(p)).collect()
应该给出以下结果:
Out[17]:
[[{u'end_offset': 3,
u'position': 1,
u'start_offset': 0,
u'token': u'the',
u'type': u'<ALPHANUM>'}],
[{u'end_offset': 5,
u'position': 1,
u'start_offset': 0,
u'token': u'brown',
u'type': u'<ALPHANUM>'}]]
请注意,对于大型数据集,这将非常低效,因为它涉及对数据集中每个元素的 ES 的 REST 调用。