如何在 Foundry 中测试多输出 pyspark 转换

How to test multi-output pyspark transforms in Foundry

我们想为具有多个输出(即 @transform 注释)的 python 转换编写单元测试,但无法构建我们需要传递给的 TransformOutput 对象我们正在测试的功能。
执行此操作的最佳方法是什么?

您可以如下创建假输入和输出,然后将它们传递到您的 @transform 函数中:

class FakeTransformInput:
    def __init__(self, df):
        self.df = df

    def dataframe(self):
        return self.df

    def set_mode(self, mode):
        pass
class FakeTransformOutput:
    def __init__(self, df):
        self.df = df

    def dataframe(self):
        return self.df

    def write_dataframe(
        self, df, partition_cols=None, bucket_cols=None, bucket_count=None,
            sort_by=None, output_format=None, options=None, column_descriptions=None,
            column_typeclasses=None):
        self.df = df

    def set_mode(self, mode):
        pass

并使用它们:

output_schema = StructType([
    StructField("col_1", StringType(), True),
    StructField("col_2", StringType(), True),
    StructField("col_n", StringType(), True),
])
output_transform = FakeTransformOutput(spark_session.createDataFrame([], output_schema))

input_transform = FakeTransformInput(spark_session.createDataFrame(input_df))

YOUR_MODULE.compute(
    input_transform, output_transform
)

# Perform assertions on output_transform