是否可以通过 cypari2 使用 Pari 并行化 python 代码?

Is it possible to parallelize python code using Pari via cypari2?

在通过 cypari2 使用 Pari 时,我无法并行尝试 运行 几个循环。我将包括几个小的工作示例以及回溯,以防有人对此有所了解。

示例 1 -- 使用 joblib:

from cypari2 import Pari
from joblib import Parallel, delayed

def AddOne(v):
    return v + pari.one()

pari = Pari()
vec = [pari('x_1'), pari('x_2')]
print(vec)

#works
newVec = Parallel(n_jobs=1)(delayed(AddOne)(i) for i in vec)
print(newVec)

#doesn't work
newVec2 = Parallel(n_jobs=2)(delayed(AddOne)(i) for i in vec)
print(newVec2)

输出:

[x_1, x_2]
[x_1 + 1, x_2 + 1]
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/joblib/externals/loky/backend/queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "/usr/lib/python3/dist-packages/joblib/externals/loky/backend/reduction.py", line 247, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/usr/lib/python3/dist-packages/joblib/externals/loky/backend/reduction.py", line 240, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/usr/lib/python3/dist-packages/joblib/externals/cloudpickle/cloudpickle_fast.py", line 538, in dump
    return Pickler.dump(self, obj)
  File "stringsource", line 2, in cypari2.pari_instance.Pari.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "min_jake_joblib.py", line 16, in <module>
    newVec2 = Parallel(n_jobs=2)(delayed(AddOne)(i) for i in vec)
  File "/usr/lib/python3/dist-packages/joblib/parallel.py", line 1016, in __call__
    self.retrieve()
  File "/usr/lib/python3/dist-packages/joblib/parallel.py", line 908, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/usr/lib/python3/dist-packages/joblib/_parallel_backends.py", line 554, in wrap_future_result
    return future.result(timeout=timeout)
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
_pickle.PicklingError: Could not pickle the task to send it to the workers.

似乎是 Pari 对象的问题,但有什么解决办法吗?

示例 2 -- 使用多处理:

from cypari2 import Pari
import multiprocessing

def AddOne(v):
    return v + pari.one()

pari = Pari()
vec = [pari('x_1'), pari('x_2')]
print(vec)

#doesn't work
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 2) ## doesn't matter how many I use
    newVec = pool.map(AddOne, (i for i in vec))
    print(newVec)

它seg faults,但并没有完全自动退出,所以我不得不使用Ctrl^C来杀死它。输出:

[x_1, x_2]
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 576, in _handle_results
    task = get()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "cypari2/gen.pyx", line 4705, in cypari2.gen.objtogen
  File "cypari2/gen.pyx", line 4812, in cypari2.gen.objtogen
  File "cypari2/convert.pyx", line 557, in cypari2.convert.PyObject_AsGEN
cysignals.signals.SignalError: Segmentation fault
^CProcess ForkPoolWorker-1:
Process ForkPoolWorker-2:
Traceback (most recent call last):
  File "min_jake_multiprocessing.py", line 14, in <module>
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 356, in get
    res = self._reader.recv_bytes()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "src/cysignals/signals.pyx", line 320, in cysignals.signals.python_check_interrupt
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 355, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "src/cysignals/signals.pyx", line 320, in cysignals.signals.python_check_interrupt
KeyboardInterrupt
KeyboardInterrupt
    newVec = pool.map(AddOne, (i for i in vec))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 765, in get
    self.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 762, in wait
    self._event.wait(timeout)
  File "/usr/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()
  File "src/cysignals/signals.pyx", line 320, in cysignals.signals.python_check_interrupt
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 695, in _terminate_pool
    raise AssertionError(
AssertionError: Cannot have cache with result_hander not alive

我想有人会告诉我改用 sympy 或其他一些符号代数包,但我需要做的符号代数非常复杂,Pari 可以处理得非常好。但是,最后我希望能够并行处理包含 Pari 对象的 class 对象队列。任何 thoughts/suggestions 都表示赞赏。

好吧,这不是一个完整的答案,但它对我有用,所以我想分享一下,以防其他人 运行 遇到这个问题。

第一个问题似乎是 apt 存储库中的 libpari-dev 和 pari-gp 版本太旧。 apt 存储库包含 2.11 版,而 Pari git 存储库中的版本是 2.14 版。卸载并按照 here 中的说明从源安装解决了我的大部分问题。

有趣的是,我仍然需要从 apt 存储库安装 libpari-gmp-tls6 才能正常工作。但是,在那之后我能够将上面的测试示例获取到 运行。使用 multiprocessing 运行 的示例无需修改即可成功,但使用 joblib 的示例需要使用“threading”后端才能 运行.