当 SageMath 代码在 python 中为 运行 时,使用 Dask 会抛出 ImportError
Using Dask throws ImportError when SageMath code is run in python
这个问题与我的 earlier question 非常相似,是由其中一条评论提示的。
最近,我一直在尝试使用 Dask 并行化一些代码。该代码涉及 SageMath 中的计算,但似乎每当我在函数中使用 Sage 代码时,我试图并行化它就会抛出 ImportError,即使 Sage 已成功加载。我想知道为什么即使 Sage 似乎已成功加载我也会收到 ImportError,更重要的是,如何修复它。
这是我 运行 正在研究的一个基本示例。当我 运行 这个:
import time
from sage.all import *
from dask import delayed
from dask.distributed import Client
client = Client(n_workers=4)
#I can add Sage integers with no problem
#So Sage seems to be loaded
Integer(1)+Integer(1)
def Hello():
Integer(1)+Integer(1) #if I remove this line the code runs fine
return 'Hello World'
z = delayed(Hello)()
z.compute()
我收到这个错误
ImportError Traceback (most recent call last)
<timed eval> in <module>
~/.sage/local/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
284 dask.base.compute
285 """
--> 286 (result,) = compute(self, traverse=False, **kwargs)
287 return result
288
~/.sage/local/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
566 postcomputes.append(x.__dask_postcompute__())
567
--> 568 results = schedule(dsk, keys, **kwargs)
569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
570
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/.sage/local/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
324 if error[0]:
325 typ, exc, tb = error[0]
--> 326 raise exc.with_traceback(tb)
327 else:
328 return result[0]
~/.sage/local/lib/python3.9/site-packages/distributed/utils.py in f()
307 if callback_timeout is not None:
308 future = asyncio.wait_for(future, callback_timeout)
--> 309 result[0] = yield future
310 except Exception:
311 error[0] = sys.exc_info()
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
~/.sage/local/lib/python3.9/site-packages/distributed/protocol/pickle.py in loads()
73 return pickle.loads(x, buffers=buffers)
74 else:
---> 75 return pickle.loads(x)
76 except Exception:
77 logger.info("Failed to deserialize %s", x[:10000], exc_info=True)
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/integer.pyx in init sage.rings.integer (build/cythonized/sage/rings/integer.c:54201)()
----> 1 r"""
2 Elements of the ring `\ZZ` of integers
3
4 Sage has highly optimized and extensive functionality for arithmetic with integers
5 and the ring of integers.
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/rational.pyx in init sage.rings.rational (build/cythonized/sage/rings/rational.cpp:40442)()
98
99
--> 100 import sage.rings.real_mpfr
101 import sage.rings.real_double
102 from libc.stdint cimport uint64_t
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/real_mpfr.pyx in init sage.rings.real_mpfr (build/cythonized/sage/rings/real_mpfr.c:46795)()
----> 1 r"""
2 Arbitrary Precision Real Numbers
3
4 AUTHORS:
5
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/libs/mpmath/utils.pyx in init sage.libs.mpmath.utils (build/cythonized/sage/libs/mpmath/utils.c:9062)()
----> 1 """
2 Utilities for Sage-mpmath interaction
3
4 Also patches some mpmath functions for speed
5 """
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/complex_mpfr.pyx in init sage.rings.complex_mpfr (build/cythonized/sage/rings/complex_mpfr.c:34594)()
----> 1 """
2 Arbitrary Precision Floating Point Complex Numbers
3
4 AUTHORS:
5
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/complex_double.pyx in init sage.rings.complex_double (build/cythonized/sage/rings/complex_double.c:25284)()
96 from cypari2.convert cimport new_gen_from_double, new_t_COMPLEX_from_double
97
---> 98 from . import complex_mpfr
99
100 from .complex_mpfr import ComplexField
ImportError: cannot import name complex_mpfr
也许这与 Dask 在并行化时没有导入 Sage 有关
不幸的是,您在这里可能不太走运(某种程度上)。看起来 sage 并不是在考虑由另一种语言驱动的线程执行的情况下开发的——它们的根级模块修改了 python 环境的关键元素,并且默认情况下真的试图控制低级功能。例如,sage.__init__
修改了 inspect
和 sqllite
的工作方式(恶心!)
您 运行 遇到的具体问题是导入 sage 会调用 signal
模块,该模块不能 运行 来自主线程以外的线程。问题不在于 sage 操作,而在于导入语句:
In [8]: def hello_sage():
...: from sage.all import Integer
...: return 'Hello World'
...:
In [9]: futures = client.submit(hello_sage)
In [10]: distributed.worker - WARNING - Compute Failed
Function: hello_sage
args: ()
kwargs: {}
Exception: ValueError('signal only works in main thread of the main interpreter')
不幸的是,这与 dask 相当不兼容,后者 运行 是线程内的所有延迟作业。并不是说 dask 不能在本地将模块导入远程函数(它绝对可以),而是那些函数不能使用 signal
来控制执行。
由于 sage 的编写方式,就多线程而言,我认为您唯一的选择是使用他们的开发人员提供的并行化选项。也就是说,您可以通过让线程启动它们自己的子进程来欺骗 sage,让它认为它处于自己的世界中:
In [1]: import dask.distributed as dd
In [2]: from subprocess import Popen, PIPE
In [3]: def invoke_sage_cli():
...: cmd = ["sage", "-c", "print(factor(35))"]
...: p = Popen(cmd, stdout=PIPE, stderr=PIPE, text=True)
...: o, e = p.communicate()
...:
...: if e:
...: raise SystemError(e)
...:
...: return o
...:
In [4]: client = dd.Client(n_workers=4)
In [5]: future = client.submit(invoke_sage_cli)
In [6]: print(future.result())
5 * 7
这是解决此问题的一种非常 hacky 的方法,我认为只要您在一台机器上工作,它就不太可能提供比本地 sage 并行化选项更多的性能优势。如果您正在使用 dask 扩展 Kubernetes 集群或使用 HPC 上的节点或其他东西,那么您绝对可以使用此路由来安排分布式作业,然后让 sage 在每个节点内管理多线程。
这个问题与我的 earlier question 非常相似,是由其中一条评论提示的。
最近,我一直在尝试使用 Dask 并行化一些代码。该代码涉及 SageMath 中的计算,但似乎每当我在函数中使用 Sage 代码时,我试图并行化它就会抛出 ImportError,即使 Sage 已成功加载。我想知道为什么即使 Sage 似乎已成功加载我也会收到 ImportError,更重要的是,如何修复它。
这是我 运行 正在研究的一个基本示例。当我 运行 这个:
import time
from sage.all import *
from dask import delayed
from dask.distributed import Client
client = Client(n_workers=4)
#I can add Sage integers with no problem
#So Sage seems to be loaded
Integer(1)+Integer(1)
def Hello():
Integer(1)+Integer(1) #if I remove this line the code runs fine
return 'Hello World'
z = delayed(Hello)()
z.compute()
我收到这个错误
ImportError Traceback (most recent call last)
<timed eval> in <module>
~/.sage/local/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
284 dask.base.compute
285 """
--> 286 (result,) = compute(self, traverse=False, **kwargs)
287 return result
288
~/.sage/local/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
566 postcomputes.append(x.__dask_postcompute__())
567
--> 568 results = schedule(dsk, keys, **kwargs)
569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
570
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/.sage/local/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
324 if error[0]:
325 typ, exc, tb = error[0]
--> 326 raise exc.with_traceback(tb)
327 else:
328 return result[0]
~/.sage/local/lib/python3.9/site-packages/distributed/utils.py in f()
307 if callback_timeout is not None:
308 future = asyncio.wait_for(future, callback_timeout)
--> 309 result[0] = yield future
310 except Exception:
311 error[0] = sys.exc_info()
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/.sage/local/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
~/.sage/local/lib/python3.9/site-packages/distributed/protocol/pickle.py in loads()
73 return pickle.loads(x, buffers=buffers)
74 else:
---> 75 return pickle.loads(x)
76 except Exception:
77 logger.info("Failed to deserialize %s", x[:10000], exc_info=True)
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/integer.pyx in init sage.rings.integer (build/cythonized/sage/rings/integer.c:54201)()
----> 1 r"""
2 Elements of the ring `\ZZ` of integers
3
4 Sage has highly optimized and extensive functionality for arithmetic with integers
5 and the ring of integers.
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/rational.pyx in init sage.rings.rational (build/cythonized/sage/rings/rational.cpp:40442)()
98
99
--> 100 import sage.rings.real_mpfr
101 import sage.rings.real_double
102 from libc.stdint cimport uint64_t
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/real_mpfr.pyx in init sage.rings.real_mpfr (build/cythonized/sage/rings/real_mpfr.c:46795)()
----> 1 r"""
2 Arbitrary Precision Real Numbers
3
4 AUTHORS:
5
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/libs/mpmath/utils.pyx in init sage.libs.mpmath.utils (build/cythonized/sage/libs/mpmath/utils.c:9062)()
----> 1 """
2 Utilities for Sage-mpmath interaction
3
4 Also patches some mpmath functions for speed
5 """
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/complex_mpfr.pyx in init sage.rings.complex_mpfr (build/cythonized/sage/rings/complex_mpfr.c:34594)()
----> 1 """
2 Arbitrary Precision Floating Point Complex Numbers
3
4 AUTHORS:
5
/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/complex_double.pyx in init sage.rings.complex_double (build/cythonized/sage/rings/complex_double.c:25284)()
96 from cypari2.convert cimport new_gen_from_double, new_t_COMPLEX_from_double
97
---> 98 from . import complex_mpfr
99
100 from .complex_mpfr import ComplexField
ImportError: cannot import name complex_mpfr
也许这与 Dask 在并行化时没有导入 Sage 有关
不幸的是,您在这里可能不太走运(某种程度上)。看起来 sage 并不是在考虑由另一种语言驱动的线程执行的情况下开发的——它们的根级模块修改了 python 环境的关键元素,并且默认情况下真的试图控制低级功能。例如,sage.__init__
修改了 inspect
和 sqllite
的工作方式(恶心!)
您 运行 遇到的具体问题是导入 sage 会调用 signal
模块,该模块不能 运行 来自主线程以外的线程。问题不在于 sage 操作,而在于导入语句:
In [8]: def hello_sage():
...: from sage.all import Integer
...: return 'Hello World'
...:
In [9]: futures = client.submit(hello_sage)
In [10]: distributed.worker - WARNING - Compute Failed
Function: hello_sage
args: ()
kwargs: {}
Exception: ValueError('signal only works in main thread of the main interpreter')
不幸的是,这与 dask 相当不兼容,后者 运行 是线程内的所有延迟作业。并不是说 dask 不能在本地将模块导入远程函数(它绝对可以),而是那些函数不能使用 signal
来控制执行。
由于 sage 的编写方式,就多线程而言,我认为您唯一的选择是使用他们的开发人员提供的并行化选项。也就是说,您可以通过让线程启动它们自己的子进程来欺骗 sage,让它认为它处于自己的世界中:
In [1]: import dask.distributed as dd
In [2]: from subprocess import Popen, PIPE
In [3]: def invoke_sage_cli():
...: cmd = ["sage", "-c", "print(factor(35))"]
...: p = Popen(cmd, stdout=PIPE, stderr=PIPE, text=True)
...: o, e = p.communicate()
...:
...: if e:
...: raise SystemError(e)
...:
...: return o
...:
In [4]: client = dd.Client(n_workers=4)
In [5]: future = client.submit(invoke_sage_cli)
In [6]: print(future.result())
5 * 7
这是解决此问题的一种非常 hacky 的方法,我认为只要您在一台机器上工作,它就不太可能提供比本地 sage 并行化选项更多的性能优势。如果您正在使用 dask 扩展 Kubernetes 集群或使用 HPC 上的节点或其他东西,那么您绝对可以使用此路由来安排分布式作业,然后让 sage 在每个节点内管理多线程。