在 Palantir Foundry 中,由于无法使用打印语句,我该如何调试 pyspark(或 pandas)UDF?
In Palantir Foundry, how do I debug pyspark (or pandas) UDFs since I can't use print statements?
在代码工作簿中,我可以使用 print
语句,这些语句出现在代码工作簿的输出部分(通常会出现错误)。这对 UDF 不起作用,在代码 Authoring/Repositories.
中也不起作用
我可以通过哪些方式调试我的 pyspark 代码,尤其是当我使用 UDF 时?
我将解释 3 个用于 pyspark 的调试工具(并可在 Foundry 中使用):
- 引发异常
- 运行 在本地作为 pandas 系列
- 记录并专门记录 UDF
引发异常
查看变量(尤其是 pandas UDF)的最简单、最快捷的方法是 Raise an Exception.
def my_compute_function(my_input):
interesting_variable = some_function(my_input) # Want to see result of this
raise ValueError(interesting_variable)
这通常比 reading/writing DataFrames 更容易,因为:
- 可以轻松地插入 raise 语句,而不会弄乱转换的 return 值或其他逻辑
- 不需要为你的调试语句定义一个有效的模式而乱七八糟
缺点是它会停止代码的执行。
运行 在本地作为 pandas 系列
如果您对 Pandas 更有经验,您可以使用少量数据样本,并且 运行 您可以在其中进行调试。
我以前使用的一些技术不仅仅是按行数对数据进行下采样,而是过滤数据以代表我的工作。例如,如果我正在编写一个算法来确定航班延误,我会过滤到特定日期飞往特定机场的所有航班。这样我就可以对样本进行整体测试。
日志记录
代码存储库使用 Python 的内置 logging library。这在网上被广泛记录,并允许您控制日志记录级别(错误、警告、信息)以便于过滤。
日志输出出现在输出数据集的日志文件和构建的驱动程序日志中(数据集 -> 详细信息 -> 文件 -> 日志文件,以及构建 -> 构建 -> 作业状态日志;select“驱动程序日志”。
这将允许您在日志中查看记录的信息(构建完成后),但不适用于 UDF。
登录 UDF
UDF 完成的工作是由执行程序而不是驱动程序完成的,Spark 从 top-level 驱动程序进程捕获日志输出。如果您在 PySpark 查询中使用 UDF 并且需要记录数据,请创建并调用第二个 UDF,该 UDF return 包含您希望捕获的数据并将其存储在一个列中,以便在构建完成后查看:
@transform_df(
...
)
def some_transformation(some_input):
logger.info("log output related to the overall query")
@F.udf("integer")
def custom_function(integer_input):
return integer_input + 5
@F.udf("string")
def custom_log(integer_input):
return "Original integer was %d before adding 5" % integer_input
df = (
some_input
.withColumn("new_integer", custom_function(F.col("example_integer_col"))
.withColumn("debugging", custom_log(F.col("example_integer_col"))
)
在代码工作簿中,我可以使用 print
语句,这些语句出现在代码工作簿的输出部分(通常会出现错误)。这对 UDF 不起作用,在代码 Authoring/Repositories.
我可以通过哪些方式调试我的 pyspark 代码,尤其是当我使用 UDF 时?
我将解释 3 个用于 pyspark 的调试工具(并可在 Foundry 中使用):
- 引发异常
- 运行 在本地作为 pandas 系列
- 记录并专门记录 UDF
引发异常
查看变量(尤其是 pandas UDF)的最简单、最快捷的方法是 Raise an Exception.
def my_compute_function(my_input):
interesting_variable = some_function(my_input) # Want to see result of this
raise ValueError(interesting_variable)
这通常比 reading/writing DataFrames 更容易,因为:
- 可以轻松地插入 raise 语句,而不会弄乱转换的 return 值或其他逻辑
- 不需要为你的调试语句定义一个有效的模式而乱七八糟
缺点是它会停止代码的执行。
运行 在本地作为 pandas 系列
如果您对 Pandas 更有经验,您可以使用少量数据样本,并且 运行
我以前使用的一些技术不仅仅是按行数对数据进行下采样,而是过滤数据以代表我的工作。例如,如果我正在编写一个算法来确定航班延误,我会过滤到特定日期飞往特定机场的所有航班。这样我就可以对样本进行整体测试。
日志记录
代码存储库使用 Python 的内置 logging library。这在网上被广泛记录,并允许您控制日志记录级别(错误、警告、信息)以便于过滤。
日志输出出现在输出数据集的日志文件和构建的驱动程序日志中(数据集 -> 详细信息 -> 文件 -> 日志文件,以及构建 -> 构建 -> 作业状态日志;select“驱动程序日志”。
这将允许您在日志中查看记录的信息(构建完成后),但不适用于 UDF。
登录 UDF
UDF 完成的工作是由执行程序而不是驱动程序完成的,Spark 从 top-level 驱动程序进程捕获日志输出。如果您在 PySpark 查询中使用 UDF 并且需要记录数据,请创建并调用第二个 UDF,该 UDF return 包含您希望捕获的数据并将其存储在一个列中,以便在构建完成后查看:
@transform_df(
...
)
def some_transformation(some_input):
logger.info("log output related to the overall query")
@F.udf("integer")
def custom_function(integer_input):
return integer_input + 5
@F.udf("string")
def custom_log(integer_input):
return "Original integer was %d before adding 5" % integer_input
df = (
some_input
.withColumn("new_integer", custom_function(F.col("example_integer_col"))
.withColumn("debugging", custom_log(F.col("example_integer_col"))
)