有没有办法测试 PySpark Regex 的?
Is there a way to test PySpark Regex's?
我想测试 PySpark 正则表达式的不同输入以查看它们是否 fail/succeed 在 运行 构建之前。有没有办法在 运行 完整 build/checks 之前在 Foundry 中对此进行测试?
您可以使用创作中的预览功能对输入进行缩减采样,然后您可以在其中指定要制作输入以进行测试的过滤器。
然后,您可以 运行 此自定义示例上的 PySpark 代码来验证它是否符合您的预期。
点击Preview
按钮后,您在下面的视图中点击了齿轮。
然后,您可以描述您想要的样本。
完成此操作后,运行在您的输入中使用正则表达式将快速且易于测试。
我也是编写单元测试的爱好者。创建一个小的输入 df,所需的输出 df,并编写一个简单的函数来获取输入、应用正则表达式和 returns 输出。
import pytest
from datetime import date
import pandas as pd # noqa
import numpy as np
from myproject.analysis.simple_discount import (
calc
)
columns = [
"date",
"id",
"other",
"brand",
"grp_id",
"amounth",
"pct",
"max_amount",
"unit",
"total_units"
]
output_columns = [
"date",
"id",
"other",
"brand",
"grp_id",
"amount",
"pct",
"max_amount",
"qty",
"total_amount"
]
@pytest.fixture
def input_df(spark_session):
data = [
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 4],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 3],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 4],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['3/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
]
pdf = pd.DataFrame(data, columns=columns)
pdf = pdf.replace({np.nan: None})
return spark_session.createDataFrame(pdf)
@pytest.fixture
def output_df(spark_session):
data = [
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 27, 14.580000000000002],
['3/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 1, 1.3],
]
pdf = pd.DataFrame(data, columns=columns)
pdf = pdf.replace({np.nan: None})
return spark_session.createDataFrame(pdf)
# ======= FIRST RUN CASE
def test_normal_input(input_df, output_df):
calc_output_df = calc(input_df)
assert sorted(calc_output_df.collect()) == sorted(output_df.collect())
#
# Folder Structure
#
# transforms-python/
# ├── ...
# └── src/
# ├── ...
# ├── myproject/
# │ ├── ...
# │ └── analysis/
# │ ├── ...
# │ └── simple_discounts.py
# └── tests/
# ├── ...
# └── unit_tests.py
我想测试 PySpark 正则表达式的不同输入以查看它们是否 fail/succeed 在 运行 构建之前。有没有办法在 运行 完整 build/checks 之前在 Foundry 中对此进行测试?
您可以使用创作中的预览功能对输入进行缩减采样,然后您可以在其中指定要制作输入以进行测试的过滤器。
然后,您可以 运行 此自定义示例上的 PySpark 代码来验证它是否符合您的预期。
点击Preview
按钮后,您在下面的视图中点击了齿轮。
然后,您可以描述您想要的样本。
完成此操作后,运行在您的输入中使用正则表达式将快速且易于测试。
我也是编写单元测试的爱好者。创建一个小的输入 df,所需的输出 df,并编写一个简单的函数来获取输入、应用正则表达式和 returns 输出。
import pytest
from datetime import date
import pandas as pd # noqa
import numpy as np
from myproject.analysis.simple_discount import (
calc
)
columns = [
"date",
"id",
"other",
"brand",
"grp_id",
"amounth",
"pct",
"max_amount",
"unit",
"total_units"
]
output_columns = [
"date",
"id",
"other",
"brand",
"grp_id",
"amount",
"pct",
"max_amount",
"qty",
"total_amount"
]
@pytest.fixture
def input_df(spark_session):
data = [
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 4],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 2],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 3],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 4],
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 1.08, 1],
['3/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
['6/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 2.6, 1],
]
pdf = pd.DataFrame(data, columns=columns)
pdf = pdf.replace({np.nan: None})
return spark_session.createDataFrame(pdf)
@pytest.fixture
def output_df(spark_session):
data = [
['4/1/21', 'a', '1', 'mn', '567', 0.54, 50, 1.08, 27, 14.580000000000002],
['3/1/21', 'b', '2', 'mn', '555', 1.3, 50, 2.6, 1, 1.3],
]
pdf = pd.DataFrame(data, columns=columns)
pdf = pdf.replace({np.nan: None})
return spark_session.createDataFrame(pdf)
# ======= FIRST RUN CASE
def test_normal_input(input_df, output_df):
calc_output_df = calc(input_df)
assert sorted(calc_output_df.collect()) == sorted(output_df.collect())
#
# Folder Structure
#
# transforms-python/
# ├── ...
# └── src/
# ├── ...
# ├── myproject/
# │ ├── ...
# │ └── analysis/
# │ ├── ...
# │ └── simple_discounts.py
# └── tests/
# ├── ...
# └── unit_tests.py