在 Palantir Foundry 中,由于无法使用打印语句,我该如何调试 pyspark(或 pandas)UDF?

In Palantir Foundry, how do I debug pyspark (or pandas) UDFs since I can't use print statements?

在代码工作簿中,我可以使用 print 语句,这些语句出现在代码工作簿的输出部分(通常会出现错误)。这对 UDF 不起作用,在代码 Authoring/Repositories.

中也不起作用

我可以通过哪些方式调试我的 pyspark 代码,尤其是当我使用 UDF 时?

我将解释 3 个用于 pyspark 的调试工具(并可在 Foundry 中使用):

  1. 引发异常
  2. 运行 在本地作为 pandas 系列
  3. 记录并专门记录 UDF

引发异常

查看变量(尤其是 pandas UDF)的最简单、最快捷的方法是 Raise an Exception.

def my_compute_function(my_input):
    interesting_variable = some_function(my_input)  # Want to see result of this
    raise ValueError(interesting_variable)

这通常比 reading/writing DataFrames 更容易,因为:

  1. 可以轻松地插入 raise 语句,而不会弄乱转换的 return 值或其他逻辑
  2. 不需要为你的调试语句定义一个有效的模式而乱七八糟

缺点是它会停止代码的执行。

运行 在本地作为 pandas 系列

如果您对 Pandas 更有经验,您可以使用少量数据样本,并且 运行 您可以在其中进行调试。

我以前使用的一些技术不仅仅是按行数对数据进行下采样,而是过滤数据以代表我的工作。例如,如果我正在编写一个算法来确定航班延误,我会过滤到特定日期飞往特定机场的所有航班。这样我就可以对样本进行整体测试。

日志记录

代码存储库使用 Python 的内置 logging library。这在网上被广泛记录,并允许您控制日志记录级别(错误、警告、信息)以便于过滤。

日志输出出现在输出数据集的日志文件和构建的驱动程序日志中(数据集 -> 详细信息 -> 文件 -> 日志文件,以及构建 -> 构建 -> 作业状态日志;select“驱动程序日志”。

这将允许您在日志中查看记录的信息(构建完成后),但不适用于 UDF。

登录 UDF

UDF 完成的工作是由执行程序而不是驱动程序完成的,Spark 从 top-level 驱动程序进程捕获日志输出。如果您在 PySpark 查询中使用 UDF 并且需要记录数据,请创建并调用第二个 UDF,该 UDF return 包含您希望捕获的数据并将其存储在一个列中,以便在构建完成后查看:

@transform_df(
    ...
)
def some_transformation(some_input):
    logger.info("log output related to the overall query")
    
    @F.udf("integer")
    def custom_function(integer_input):
        return integer_input + 5
    
    @F.udf("string")
    def custom_log(integer_input):
        return "Original integer was %d before adding 5" % integer_input
    
    df = (
        some_input
        .withColumn("new_integer", custom_function(F.col("example_integer_col"))
        .withColumn("debugging", custom_log(F.col("example_integer_col"))
    )