为什么 pyspark 中的 S3 目录源的 input_file_name() 为空?

Why is input_file_name() empty for S3 catalog sources in pyspark?

我正在尝试获取通过 AWS Glue 中的 S3 数据目录加载的每个文件的输入文件名(或路径)。

在一些地方 input_file_name() 应该提供此信息(尽管警告说这仅在调用 from_catalog 而不是 from_options 时有效,这我相信我是!)。

所以下面的代码看起来应该可以工作,但每个 input_file_name.

总是 returns 空值
import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name


args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session


job = Job(gc)
job.init(args['JOB_NAME'], args)


# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
    database='database_name',
    table_name='table_name',
    transformation_ctx='fm_source',
)

df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)

结果输出:

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

是否有另一种方法可以创建确保填充 input_file_name() 的框架?我现在尝试通过 create_dynamic_frame.from_catalogcreate_dynamic_frame.from_optionsgetSource().getFrame() 构建一个源帧,但我得到的结果相同,每个 input_file_name 列都是空的。

我认为使用 groupFiles 选项时这是不可能的,因为 Glue 在幕后连接文件以创建最佳输入数量。因此,鉴于原始文件路径不再是直接输入,input_file_name 的概念在此上下文中没有意义。

但是,从某种意义上说,即使对于少于 50,000 个文件的输入,不明确禁用该选项也会触发 Glue 根据文件大小连接输入,因此文档会产生轻微的误导。在我们的例子中,我们有数千个微小的输入文件(<1 MB)导致了这种行为。

您可以通过显式禁用分组轻松验证这一点(请注意,这将对与我们类似的场景产生严重的性能影响:

ds_s3 = self.gc.getSource(
    connection_type='s3',
    paths=paths,
    groupFiles='none',
)
fm_s3 = ds_s3.getFrame()

当然,最好不要依赖于输入状态或上下文,所以我们最终编写了一个在 S3 PUT 上触发的 AWS Lambda,它写入元数据(包括文件名和路径)进入文件本身。

我也补充一下我的经验,在我的例子中,我收到了一个空结果,因为调用了 cache() 方法。

例如:

import pyspark.sql.functions as F

df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())

df.show()

我收到

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

但是如果我删除行 df.cache(),列 input_file_name 会正确显示输入文件名。

解决方法可能是在缓存之前调用 F.input_file_name()

我遇到了与 Vzzarr 相同的问题。当我在数据框上调用 cache 后创建 input_file_path 列时,文件路径为空。但是当我在调用 cache 之前创建 input_file_path 时,它起作用了。

我发现了另一个奇怪的行为。当我在 input_file_path 之前添加一个 limit() 时,id 也不起作用。

此代码的文件名为空

df_raw = (spark
          .read
          .option("delimiter", ";")
          .csv(filepath,header = "true",inferSchema=True)      
          .select("COL1","COL2")
          .limit(1000)
          )

df_transform = (df_raw
          .withColumn("filename", f.input_file_name())
          )

此代码有效

df_raw = (spark
          .read
          .option("delimiter", ";")
          .csv(filepath,header = "true",inferSchema=True)      
          .select("COL1","COL2")
          )

df_transform = (df_raw
          .withColumn("filename", f.input_file_name())
          .limit(1000)
          )

我花了一些时间才弄明白,因为我试图通过只读取几行来加快调试速度。

这件事发生在我身上,因为我的 databricks 集群 (7.6) 有一个已弃用的运行时,我在 7.3 LTS 上尝试了它,但效果很好。 :)