单元测试 PyFlink UDF 失败
Failed to unit test PyFlink UDF
我正在使用 PyFlink,我想对用 Python 编写的 UDF 进行单元测试。
测试下面的简单udf:
# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
我创建了一个应该会失败的测试文件:
from tasks.helloworld.udf import add
def test_add():
assert add(1,1) == 3
遗憾的是,如果我 运行 pytest
:
> pytest
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item
tests/test_helloworld.py . [100%]
============================================================================================= warnings summary =============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
from collections import (
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291
/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
if not isinstance(input_types, collections.Iterable) \
-- Docs: https://docs.pytest.org/en/stable/warnings.html
====================================================================================== 1 passed, 6 warnings in 0.98s =======================================================================================
但是,如果我从 udf.py 中删除 @udf(input_types=[...], result_type=...)
注释,测试将按预期失败。
# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf
# Comment the udf annotation
# @udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
结果:
> pytest
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item
tests/test_helloworld.py F [100%]
================================================================================================= FAILURES =================================================================================================
_________________________________________________________________________________________________ test_add _________________________________________________________________________________________________
def test_add():
> assert add(1,1) == 3
E assert 2 == 3
E + where 2 = add(1, 1)
tests/test_helloworld.py:4: AssertionError
============================================================================================= warnings summary =============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
from collections import (
-- Docs: https://docs.pytest.org/en/stable/warnings.html
========================================================================================= short test summary info ==========================================================================================
FAILED tests/test_helloworld.py::test_add - assert 2 == 3
====================================================================================== 1 failed, 5 warnings in 0.17s =======================================================================================
可以找到完整的示例 https://github.com/YikSanChan/how-to-pytest-flink。
我从 Apache Flink 用户邮件列表移植了 Dian Fu 的 answer,这解决了我的问题。
As the udf add
is decorated with @udf
decorator, it is no longer a simple Python function if you reference add
. If you execute print(type(add(1, 1)))
, you will see the output is something like "<class 'pyflink.table.expression.Expression'>".
You could try the following code: assert add._func(1, 1) == 3
add._func returns the original Python function.
我正在使用 PyFlink,我想对用 Python 编写的 UDF 进行单元测试。
测试下面的简单udf:
# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
我创建了一个应该会失败的测试文件:
from tasks.helloworld.udf import add
def test_add():
assert add(1,1) == 3
遗憾的是,如果我 运行 pytest
:
> pytest
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item
tests/test_helloworld.py . [100%]
============================================================================================= warnings summary =============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
from collections import (
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291
/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
if not isinstance(input_types, collections.Iterable) \
-- Docs: https://docs.pytest.org/en/stable/warnings.html
====================================================================================== 1 passed, 6 warnings in 0.98s =======================================================================================
但是,如果我从 udf.py 中删除 @udf(input_types=[...], result_type=...)
注释,测试将按预期失败。
# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf
# Comment the udf annotation
# @udf(input_types=[DataTypes.INT(), DataTypes.INT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
结果:
> pytest
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item
tests/test_helloworld.py F [100%]
================================================================================================= FAILURES =================================================================================================
_________________________________________________________________________________________________ test_add _________________________________________________________________________________________________
def test_add():
> assert add(1,1) == 3
E assert 2 == 3
E + where 2 = add(1, 1)
tests/test_helloworld.py:4: AssertionError
============================================================================================= warnings summary =============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
from collections import (
-- Docs: https://docs.pytest.org/en/stable/warnings.html
========================================================================================= short test summary info ==========================================================================================
FAILED tests/test_helloworld.py::test_add - assert 2 == 3
====================================================================================== 1 failed, 5 warnings in 0.17s =======================================================================================
可以找到完整的示例 https://github.com/YikSanChan/how-to-pytest-flink。
我从 Apache Flink 用户邮件列表移植了 Dian Fu 的 answer,这解决了我的问题。
As the udf
add
is decorated with@udf
decorator, it is no longer a simple Python function if you referenceadd
. If you executeprint(type(add(1, 1)))
, you will see the output is something like "<class 'pyflink.table.expression.Expression'>".You could try the following code: assert add._func(1, 1) == 3
add._func returns the original Python function.