为什么我没有在我的 PySpark 代码中看到日志行,而我希望它们出现?

Why don't I see log lines in my PySpark code when I would expect them to appear?

我正在编写一些 PySpark 代码,我想在其中执行连接和其他操作,但我想在此阶段成功完成时进行记录。

为什么我没有看到它按我期望的顺序登录?即使我的工作仍在继续工作,看起来一切似乎都同时出现了......

了解 PySpark 在描述其将要执行的工作时使用的不同模型非常重要。

PySpark 在其查询评估中基本上是惰性的,将等待执行您请求的 任何 工作,直到绝对必要。

这意味着即使您描述了一个连接,记录了一些事情,然后继续另一个连接,第一个连接实际上还没有被执行。这是因为在正常情况下,它实际上不会开始执行任何操作,直到您在转换的最后调用 write_dataframe 方法。

一些例外情况是 .count().first().take(),以及任何强制 Spark 计算传入的 DataFrame 和 return 其结果的东西你。这意味着它将被迫在 .count() 之前实际评估查询,并且 return 在它进一步进入您的 Python 代码之前将其结果提供给您。

这正是出于性能原因,在您的代码中使用此类方法是一种反模式的确切原因,因为它们可能不会直接为您的最终数据集构建做出贡献;他们正在具体化可能不会导致您输出的摘要。

举个具体的例子,让我们考虑以下问题:

my_input_df = ...
my_other_df = ...

my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
print("I joined!")
my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
my_output.write_dataframe(my_enriched_df)

I joined! 将在您的作业开始时打印到控制台,并且您的工作将继续执行具体化 joinwithColumn,就好像什么都没发生一样。 这是因为 Python 不会阻塞打印语句的主线程,因为它不会强制评估您的 DataFrame

但是,如果我将代码更改为以下内容:

my_input_df = ...
my_other_df = ...

my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
print("I joined {0} rows!".format(my_joined_df.count())
my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
my_output.write_dataframe(my_enriched_df)

然后我会在日志中观察 I joined X rows!,我的工作会 停止执行以在执行更多工作 之前具体化 count。这将意味着我的整体构建执行速度较慢,而且很可能 。这就是为什么在代码存储库中编写代码时,您会经常注意到此类方法的警告会停止执行输出。

我们通常鼓励用户编写代码来鼓励对 DataFrame 进行惰性评估,并避免停止您的工作以将内容打印到控制台。日志可能会超出您预期的顺序或降低您的计算速度。