如何在每个 Spark 执行程序中加载无法 pickle 的数据?
How can I load data that can't be pickled in each Spark executor?
我正在使用 NoAho library which is written in Cython. Its internal trie cannot be pickled:如果我在主节点上加载它,我永远不会匹配在工作节点中执行的操作。
因为我想在每个 Spark 执行器中使用相同的 trie,我找到了一种延迟加载 trie 的方法,受到此 spaCy on Spark issue 的启发。
global trie
def get_match(text):
# 1. Load trie if needed
global trie
try:
trie
except NameError:
from noaho import NoAho
trie = NoAho()
trie.add(key_text='ms windows', payload='Windows 2000')
trie.add(key_text='ms windows 2000', payload='Windows 2000')
trie.add(key_text='windows 2k', payload='Windows 2000')
...
# 2. Find an actual match to get they payload back
return trie.findall_long(text)
虽然这有效,但每个 Spark 作业都会执行所有 .add()
调用,这大约需要一分钟。由于我不确定 "Spark job" 是正确的术语,我会更明确:我在 Jupyter 笔记本中使用 Spark,每次我 运行 一个需要 get_match()
的单元格功能,trie 永远不会被缓存,并且需要一分钟来加载 trie,这占 运行 时间。
我能做些什么来确保 trie 被缓存?或者我的问题有更好的解决方案吗?
您可以尝试的一件事是使用单例模块来加载和初始化 trie
。基本上你所需要的只是一个像这样的独立模块:
trie_loader.py
from noaho import NoAho
def load():
trie = NoAho()
trie.add('ms windows', 'Windows 2000')
trie.add('ms windows 2000', 'Windows 2000')
trie.add('windows 2k', 'Windows 2000')
return trie
trie = load()
并使用标准 Spark 工具分发它:
sc.addPyFile("trie_loader.py")
import trie_loader
rdd = sc.parallelize(["ms windows", "Debian GNU/Linux"])
rdd.map(lambda x: (x, trie_loader.trie.find_long(x))).collect()
## [('ms windows', (0, 10, 'Windows 2000')),
## ('Debian GNU/Linux', (None, None, None))]
这应该在每次 Python 进程执行器启动时加载所需的数据,而不是在访问数据时加载它。我不确定它是否对这里有帮助,但值得一试。
我正在使用 NoAho library which is written in Cython. Its internal trie cannot be pickled:如果我在主节点上加载它,我永远不会匹配在工作节点中执行的操作。
因为我想在每个 Spark 执行器中使用相同的 trie,我找到了一种延迟加载 trie 的方法,受到此 spaCy on Spark issue 的启发。
global trie
def get_match(text):
# 1. Load trie if needed
global trie
try:
trie
except NameError:
from noaho import NoAho
trie = NoAho()
trie.add(key_text='ms windows', payload='Windows 2000')
trie.add(key_text='ms windows 2000', payload='Windows 2000')
trie.add(key_text='windows 2k', payload='Windows 2000')
...
# 2. Find an actual match to get they payload back
return trie.findall_long(text)
虽然这有效,但每个 Spark 作业都会执行所有 .add()
调用,这大约需要一分钟。由于我不确定 "Spark job" 是正确的术语,我会更明确:我在 Jupyter 笔记本中使用 Spark,每次我 运行 一个需要 get_match()
的单元格功能,trie 永远不会被缓存,并且需要一分钟来加载 trie,这占 运行 时间。
我能做些什么来确保 trie 被缓存?或者我的问题有更好的解决方案吗?
您可以尝试的一件事是使用单例模块来加载和初始化 trie
。基本上你所需要的只是一个像这样的独立模块:
trie_loader.py
from noaho import NoAho def load(): trie = NoAho() trie.add('ms windows', 'Windows 2000') trie.add('ms windows 2000', 'Windows 2000') trie.add('windows 2k', 'Windows 2000') return trie trie = load()
并使用标准 Spark 工具分发它:
sc.addPyFile("trie_loader.py")
import trie_loader
rdd = sc.parallelize(["ms windows", "Debian GNU/Linux"])
rdd.map(lambda x: (x, trie_loader.trie.find_long(x))).collect()
## [('ms windows', (0, 10, 'Windows 2000')),
## ('Debian GNU/Linux', (None, None, None))]
这应该在每次 Python 进程执行器启动时加载所需的数据,而不是在访问数据时加载它。我不确定它是否对这里有帮助,但值得一试。