Palantir Foundry 增量测试很难迭代,如何更快地找到错误?

Palantir Foundry incremental testing is hard to iterate on, how do I find bugs faster?

我在使用增量计算的 Foundry 实例中设置了管道,但由于某种原因没有达到我的预期。即,我想读取我的转换的先前输出并获取日期的最大值,然后仅读取此最大日期后立即输入的数据。

出于某种原因,它没有按照我的预期进行,在构建/分析/修改代码过程中单步执行代码非常令人沮丧。

我的代码如下所示:

from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta


JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
  T.StructField("date", T.DateType()),
  T.StructField("value", T.IntegerType())
])


@incremental(semantic_version=1)
@transform(
    my_input=Input("/path/to/my/input"),
    my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
  """Filter the input to only rows that are a day after the last written output and process them"""

  # Get the previous output and full current input
  previous_output_df = my_output.dataframe("previous", output_schema)
  current_input_df = my_input.dataframe("current")

  # Get the next date of interest from the previous output 
  previous_max_date_rows = previous_output_df.groupBy().agg(
      F.max(F.col("date")).alias("max_date")
  ).collect() # noqa
  # PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
  #   operation in favor of making a new query plan. 

  if len(previous_max_date_rows) == 0:
    # We are running for the first time or re-snapshotting.  There's no previous date.  Use fallback.  
    previous_max_date = START_DATE
  else:
    # We have a previous max date, use it. 
    previous_max_date = previous_max_date_rows[0][0]

  delta = timedelta(days=JUMP_DAYS)
  next_date = previous_max_date + delta

  # Filter the input to only the next date
  filtered_input = current_input_df.filter(F.col("date") == F.lit(date))

  # Do any other processing...

  output_df = filtered_input

  # Persist 
  my_output.set_mode("modify")
  my_output.write_dataframe(output_df)

在增量转换中,可能很难隔离破坏代码的条件。因此,通常最好:

  1. 让你的计算函数什么都不做 除了获取输入/输出的适当视图并将这些数据帧传递给内部方法
  2. 模块化你的每一块逻辑,使其可测试
  3. 为每个部分编写测试,以验证特定 DataFrame 的每个操作是否符合您的预期。

在您的代码示例中,将执行分解为一组可测试的方法将使测试它并查看问题所在变得更加容易。

新方法应如下所示:

from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta


JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
  T.StructField("date", T.DateType()),
  T.StructField("value", T.IntegerType())
])


def get_previous_max_date(previous_output_df) -> date:
  """Given the previous output, get the maximum date written to it"""
  previous_max_date_rows = previous_output_df.groupBy().agg(
      F.max(F.col("date")).alias("max_date")
  ).collect() # noqa
  # PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
  #   operation in favor of making a new query plan. 

  if len(previous_max_date_rows) == 0:
    # We are running for the first time or re-snapshotting.  There's no previous date.  Use fallback.  
    previous_max_date = START_DATE
  else:
    # We have a previous max date, use it. 
    previous_max_date = previous_max_date_rows[0][0]
  return previous_max_date


def get_next_date(previous_output_df) -> date:
  """Given the previous output, compute the max date + 1 day"""
  previous_max_date = get_previous_max_date(previous_output_df)
  delta = timedelta(days=JUMP_DAYS)
  next_date = previous_max_date + delta
  return next_date


def filter_input_to_date(current_input_df: DataFrame, date_filter: date) -> DataFrame:
  """Given the entire intput, filter to only rows that have the next date"""
  return current_input_df.filter(F.col("date") == F.lit(date))


def process_with_dfs(current_input_df, previous_output_df) -> DataFrame:
  """With the constructed DataFrames, do our work"""
  # Get the next date of interest from the previous output 
  next_date = get_next_date(previous_output_df)

  # Filter the input to only the next date
  filtered_input = filter_input_to_date(current_input_df, next_date)

  # Do any other processing...

  return filtered_input


@incremental(semantic_version=1)
@transform(
    my_input=Input("/path/to/my/input"),
    my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
  """Filter the input to only rows that are a day after the last written output and process them"""

  # Get the previous output and full current input
  previous_output_df = my_output.dataframe("previous", output_schema)
  current_input_df = my_input.dataframe("current")

  # Do the processing
  output_df = process_with_dfs(current_input_df, previous_output_df)

  # Persist 
  my_output.set_mode("modify")
  my_output.write_dataframe(output_df)

您现在可以设置单独的单元测试,假设您的代码位于 transforms-python/src/myproject/datasets/output.py,按照方法 正确设置所有内容。

因此我的测试文件现在如下所示:

from pyspark.sql import functions as F, types as T
from myproject.datasets import (
    only_write_one_day,
    process_with_dfs,
    filter_input_to_date,
    get_next_date,
    get_previous_max_date,
    OUTPUT_SCHEMA,
    JUMP_DAYS,
    START_DATE
)
import pytest
from datetime import date


@pytest.fixture
def empty_output_df(spark_session):
    data = []
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


@pytest.fixture
def single_write_output_df(spark_session):
    data = [{
        "date": date(year=2021, month=10, day=1),
        "value": 1
    }]
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


@pytest.fixture
def double_write_output_df(spark_session):
    data = [
        {
            "date": date(year=2021, month=10, day=1),
            "value": 1
        },
        {
            "date": date(year=2021, month=10, day=2),
            "value": 2
        }
    ]
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


@pytest.fixture
def normal_input_df(spark_session):
    data = [
        {
            "date": date(year=2021, month=10, day=1),
            "value": 1
        },
        {
            "date": date(year=2021, month=10, day=2),
            "value": 2
            }
    ]
    return spark_session.createDataFrame(data, OUTPUT_SCHEMA)


# ======= FIRST RUN CASE

def test_first_run_process_with_dfs(normal_input_df, empty_output_df):
    assert True


def test_first_run_filter_input_to_date(normal_input_df, empty_output_df):
    assert True


def test_first_run_get_next_date(normal_input_df, empty_output_df):
    assert True


def test_first_run_get_previous_max_date(normal_input_df, empty_output_df):
    assert True


# ======= NORMAL CASE

def test_normal_run_process_with_dfs(normal_input_df, single_write_output_df):
    assert True


def test_normal_run_filter_input_to_date(normal_input_df, single_write_output_df):
    assert True


def test_normal_run_get_next_date(normal_input_df, single_write_output_df):
    assert True


def test_normal_run_get_previous_max_date(normal_input_df, single_write_output_df):
    assert True

值得注意的是,这就是为什么您可以在 Foundry 中启用诸如 McCabe 复杂性检查器和单元测试覆盖率功能之类的功能,这样您就可以像这样将代码分解成更小、更耐用的部分。

遵循这样的设计模式将为您提供更持久的代码,这些代码在增量转换中更值得信赖。

如果您采用这种转换方式,您还可以 更快地迭代 以通过 运行 您正在寻找的单独测试来完善您的逻辑“测试”的代码存储库功能。您可以打开测试文件并单击您感兴趣的特定案例旁边的绿色“测试”按钮,这将使您编写逻辑比每次单击构建并尝试像您一样排列输入条件要快得多想要。