如何并行执行 python 个子测试?
How to execute python subTests in parallel?
考虑以下 unittest.TestCase
,它实现了同一测试的两个版本,唯一的区别是其中一个使用 multiprocessing
.[=33= 并行执行 subTest
s ]
import multiprocessing as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
串行版本 test_equality
按预期工作并产生以下测试失败:
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=2, source=1)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 1 != 2
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=1, source=2)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 2 != 1
另一方面,test_equality_parallel
导致错误,因为 TestCase
无法被腌制:
Traceback (most recent call last):
File "temp.py", line 15, in test_equality_parallel
pool.map(self._test_equals, [1, 2])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 768, in get
raise self._value
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
现在我知道我可以将 _test_equals
拆分为 class 之外的独立函数;但是,我想保留 subTest
的行为,以便更好地记录(和后续调试)测试失败。
我怎样才能 运行 并行测试,同时保持这个 subTest
功能?
更新
我也尝试过使用 pathos.multiprocessing.ProcessingPool
来规避 TestCase
序列化的问题;但是,在这种情况下 pool.join()
引发 ValueError: Pool is still running
.
from pathos.multiprocessing import ProcessingPool
...
def test_equality_parallel(self):
pool = ProcessingPool(processes=4)
pool.map(self._test_equals, [1, 2])
pool.join()
更新 2
This question 绝对相关。提出的第一个解决方案,为要从子进程调用的方法创建第二个 class,是不合适的,因为它不会启用 subTest
的使用。第二,从 TestCase
中删除不可腌制的 _Outcome
对象似乎很老套,而且考虑到子进程是 运行ning subTests
,看起来也不合适。
我是 pathos
(以及 dill
和 multiprocess
)的作者。您仍然看到跨进程 pathos
的序列化错误。您可以尝试跨线程序列化。线程并行可能适合这种级别的功能。
import multiprocess.dummy as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
产生:
======================================================================
FAIL: test_equality (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2
======================================================================
FAIL: test_equality_parallel (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2
----------------------------------------------------------------------
Ran 2 tests in 0.108s
FAILED (failures=2)
这告诉我,您可以使用 dill
(即 dill.settings
内)的序列化变体来解决序列化问题。参见:https://github.com/uqfoundation/multiprocess/issues/48。
考虑以下 unittest.TestCase
,它实现了同一测试的两个版本,唯一的区别是其中一个使用 multiprocessing
.[=33= 并行执行 subTest
s ]
import multiprocessing as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
串行版本 test_equality
按预期工作并产生以下测试失败:
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=2, source=1)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 1 != 2
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=1, source=2)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 2 != 1
另一方面,test_equality_parallel
导致错误,因为 TestCase
无法被腌制:
Traceback (most recent call last):
File "temp.py", line 15, in test_equality_parallel
pool.map(self._test_equals, [1, 2])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 768, in get
raise self._value
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
现在我知道我可以将 _test_equals
拆分为 class 之外的独立函数;但是,我想保留 subTest
的行为,以便更好地记录(和后续调试)测试失败。
我怎样才能 运行 并行测试,同时保持这个 subTest
功能?
更新
我也尝试过使用 pathos.multiprocessing.ProcessingPool
来规避 TestCase
序列化的问题;但是,在这种情况下 pool.join()
引发 ValueError: Pool is still running
.
from pathos.multiprocessing import ProcessingPool
...
def test_equality_parallel(self):
pool = ProcessingPool(processes=4)
pool.map(self._test_equals, [1, 2])
pool.join()
更新 2
This question 绝对相关。提出的第一个解决方案,为要从子进程调用的方法创建第二个 class,是不合适的,因为它不会启用 subTest
的使用。第二,从 TestCase
中删除不可腌制的 _Outcome
对象似乎很老套,而且考虑到子进程是 运行ning subTests
,看起来也不合适。
我是 pathos
(以及 dill
和 multiprocess
)的作者。您仍然看到跨进程 pathos
的序列化错误。您可以尝试跨线程序列化。线程并行可能适合这种级别的功能。
import multiprocess.dummy as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
产生:
======================================================================
FAIL: test_equality (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2
======================================================================
FAIL: test_equality_parallel (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2
----------------------------------------------------------------------
Ran 2 tests in 0.108s
FAILED (failures=2)
这告诉我,您可以使用 dill
(即 dill.settings
内)的序列化变体来解决序列化问题。参见:https://github.com/uqfoundation/multiprocess/issues/48。