当 SageMath 代码在 python 中为 运行 时,使用 Dask 会抛出 ImportError

Using Dask throws ImportError when SageMath code is run in python

这个问题与我的 earlier question 非常相似,是由其中一条评论提示的。

最近,我一直在尝试使用 Dask 并行化一些代码。该代码涉及 SageMath 中的计算,但似乎每当我在函数中使用 Sage 代码时,我试图并行化它就会抛出 ImportError,即使 Sage 已成功加载。我想知道为什么即使 Sage 似乎已成功加载我也会收到 ImportError,更重要的是,如何修复它。

这是我 运行 正在研究的一个基本示例。当我 运行 这个:

import time
from sage.all import *
from dask import delayed
from dask.distributed import Client


client = Client(n_workers=4)

#I can add Sage integers with no problem
#So Sage seems to be loaded
Integer(1)+Integer(1) 

def Hello():
    Integer(1)+Integer(1) #if I remove this line the code runs fine
    return 'Hello World'

z = delayed(Hello)()
z.compute()

我收到这个错误

ImportError                               Traceback (most recent call last)
<timed eval> in <module>

~/.sage/local/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

~/.sage/local/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/.sage/local/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():

~/.sage/local/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,

~/.sage/local/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )

~/.sage/local/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

~/.sage/local/lib/python3.9/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/.sage/local/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":

~/.sage/local/lib/python3.9/site-packages/distributed/protocol/pickle.py in loads()
     73             return pickle.loads(x, buffers=buffers)
     74         else:
---> 75             return pickle.loads(x)
     76     except Exception:
     77         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/integer.pyx in init sage.rings.integer (build/cythonized/sage/rings/integer.c:54201)()
----> 1 r"""
      2 Elements of the ring `\ZZ` of integers
      3 
      4 Sage has highly optimized and extensive functionality for arithmetic with integers
      5 and the ring of integers.

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/rational.pyx in init sage.rings.rational (build/cythonized/sage/rings/rational.cpp:40442)()
     98 
     99 
--> 100 import sage.rings.real_mpfr
    101 import sage.rings.real_double
    102 from libc.stdint cimport uint64_t

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/real_mpfr.pyx in init sage.rings.real_mpfr (build/cythonized/sage/rings/real_mpfr.c:46795)()
----> 1 r"""
      2 Arbitrary Precision Real Numbers
      3 
      4 AUTHORS:
      5 

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/libs/mpmath/utils.pyx in init sage.libs.mpmath.utils (build/cythonized/sage/libs/mpmath/utils.c:9062)()
----> 1 """
      2 Utilities for Sage-mpmath interaction
      3 
      4 Also patches some mpmath functions for speed
      5 """

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/complex_mpfr.pyx in init sage.rings.complex_mpfr (build/cythonized/sage/rings/complex_mpfr.c:34594)()
----> 1 """
      2 Arbitrary Precision Floating Point Complex Numbers
      3 
      4 AUTHORS:
      5 

/var/tmp/sage-9.4-current/local/lib/python3.9/site-packages/sage/rings/complex_double.pyx in init sage.rings.complex_double (build/cythonized/sage/rings/complex_double.c:25284)()
     96 from cypari2.convert cimport new_gen_from_double, new_t_COMPLEX_from_double
     97 
---> 98 from . import complex_mpfr
     99 
    100 from .complex_mpfr import ComplexField

ImportError: cannot import name complex_mpfr

也许这与 Dask 在并行化时没有导入 Sage 有关

不幸的是,您在这里可能不太走运(某种程度上)。看起来 sage 并不是在考虑由另一种语言驱动的线程执行的情况下开发的——它们的根级模块修改了 python 环境的关键元素,并且默认情况下真的试图控制低级功能。例如,sage.__init__ 修改了 inspectsqllite 的工作方式(恶心!)

您 运行 遇到的具体问题是导入 sage 会调用 signal 模块,该模块不能 运行 来自主线程以外的线程。问题不在于 sage 操作,而在于导入语句:

In [8]: def hello_sage():
   ...:     from sage.all import Integer
   ...:     return 'Hello World'
   ...:

In [9]: futures = client.submit(hello_sage)

In [10]: distributed.worker - WARNING - Compute Failed
Function:  hello_sage
args:      ()
kwargs:    {}
Exception: ValueError('signal only works in main thread of the main interpreter')

不幸的是,这与 dask 相当不兼容,后者 运行 是线程内的所有延迟作业。并不是说 dask 不能在本地将模块导入远程函数(它绝对可以),而是那些函数不能使用 signal 来控制执行。

由于 sage 的编写方式,就多线程而言,我认为您唯一的选择是使用他们的开发人员提供的并行化选项。也就是说,您可以通过让线程启动它们自己的子进程来欺骗 sage,让它认为它处于自己的世界中:

In [1]: import dask.distributed as dd

In [2]: from subprocess import Popen, PIPE

In [3]: def invoke_sage_cli():
   ...:     cmd = ["sage", "-c", "print(factor(35))"]
   ...:     p = Popen(cmd, stdout=PIPE, stderr=PIPE, text=True)
   ...:     o, e = p.communicate()
   ...:
   ...:     if e:
   ...:         raise SystemError(e)
   ...:
   ...:     return o
   ...:

In [4]: client = dd.Client(n_workers=4)

In [5]: future = client.submit(invoke_sage_cli)

In [6]: print(future.result())
5 * 7

这是解决此问题的一种非常 hacky 的方法,我认为只要您在一台机器上工作,它就不太可能提供比本地 sage 并行化选项更多的性能优势。如果您正在使用 dask 扩展 Kubernetes 集群或使用 HPC 上的节点或其他东西,那么您绝对可以使用此路由来安排分布式作业,然后让 sage 在每个节点内管理多线程。