为什么 pyspark 中的 S3 目录源的 input_file_name() 为空?
Why is input_file_name() empty for S3 catalog sources in pyspark?
我正在尝试获取通过 AWS Glue 中的 S3 数据目录加载的每个文件的输入文件名(或路径)。
我 在一些地方 input_file_name()
应该提供此信息(尽管警告说这仅在调用 from_catalog
而不是 from_options
时有效,这我相信我是!)。
所以下面的代码看起来应该可以工作,但每个 input_file_name
.
总是 returns 空值
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
database='database_name',
table_name='table_name',
transformation_ctx='fm_source',
)
df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
结果输出:
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
是否有另一种方法可以创建确保填充 input_file_name()
的框架?我现在尝试通过 create_dynamic_frame.from_catalog
、create_dynamic_frame.from_options
和 getSource().getFrame()
构建一个源帧,但我得到的结果相同,每个 input_file_name
列都是空的。
我认为使用 groupFiles
选项时这是不可能的,因为 Glue 在幕后连接文件以创建最佳输入数量。因此,鉴于原始文件路径不再是直接输入,input_file_name
的概念在此上下文中没有意义。
但是,从某种意义上说,即使对于少于 50,000 个文件的输入,不明确禁用该选项也会触发 Glue 根据文件大小连接输入,因此文档会产生轻微的误导。在我们的例子中,我们有数千个微小的输入文件(<1 MB)导致了这种行为。
您可以通过显式禁用分组轻松验证这一点(请注意,这将对与我们类似的场景产生严重的性能影响:
ds_s3 = self.gc.getSource(
connection_type='s3',
paths=paths,
groupFiles='none',
)
fm_s3 = ds_s3.getFrame()
当然,最好不要依赖于输入状态或上下文,所以我们最终编写了一个在 S3 PUT
上触发的 AWS Lambda,它写入元数据(包括文件名和路径)进入文件本身。
我也补充一下我的经验,在我的例子中,我收到了一个空结果,因为调用了 cache()
方法。
例如:
import pyspark.sql.functions as F
df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())
df.show()
我收到
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
但是如果我删除行 df.cache()
,列 input_file_name
会正确显示输入文件名。
解决方法可能是在缓存之前调用 F.input_file_name()
。
我遇到了与 Vzzarr 相同的问题。当我在数据框上调用 cache
后创建 input_file_path
列时,文件路径为空。但是当我在调用 cache
之前创建 input_file_path
时,它起作用了。
我发现了另一个奇怪的行为。当我在 input_file_path
之前添加一个 limit()
时,id 也不起作用。
此代码的文件名为空
df_raw = (spark
.read
.option("delimiter", ";")
.csv(filepath,header = "true",inferSchema=True)
.select("COL1","COL2")
.limit(1000)
)
df_transform = (df_raw
.withColumn("filename", f.input_file_name())
)
此代码有效
df_raw = (spark
.read
.option("delimiter", ";")
.csv(filepath,header = "true",inferSchema=True)
.select("COL1","COL2")
)
df_transform = (df_raw
.withColumn("filename", f.input_file_name())
.limit(1000)
)
我花了一些时间才弄明白,因为我试图通过只读取几行来加快调试速度。
这件事发生在我身上,因为我的 databricks 集群 (7.6) 有一个已弃用的运行时,我在 7.3 LTS 上尝试了它,但效果很好。 :)
我正在尝试获取通过 AWS Glue 中的 S3 数据目录加载的每个文件的输入文件名(或路径)。
我 input_file_name()
应该提供此信息(尽管警告说这仅在调用 from_catalog
而不是 from_options
时有效,这我相信我是!)。
所以下面的代码看起来应该可以工作,但每个 input_file_name
.
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
database='database_name',
table_name='table_name',
transformation_ctx='fm_source',
)
df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
结果输出:
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
是否有另一种方法可以创建确保填充 input_file_name()
的框架?我现在尝试通过 create_dynamic_frame.from_catalog
、create_dynamic_frame.from_options
和 getSource().getFrame()
构建一个源帧,但我得到的结果相同,每个 input_file_name
列都是空的。
我认为使用 groupFiles
选项时这是不可能的,因为 Glue 在幕后连接文件以创建最佳输入数量。因此,鉴于原始文件路径不再是直接输入,input_file_name
的概念在此上下文中没有意义。
但是,从某种意义上说,即使对于少于 50,000 个文件的输入,不明确禁用该选项也会触发 Glue 根据文件大小连接输入,因此文档会产生轻微的误导。在我们的例子中,我们有数千个微小的输入文件(<1 MB)导致了这种行为。
您可以通过显式禁用分组轻松验证这一点(请注意,这将对与我们类似的场景产生严重的性能影响:
ds_s3 = self.gc.getSource(
connection_type='s3',
paths=paths,
groupFiles='none',
)
fm_s3 = ds_s3.getFrame()
当然,最好不要依赖于输入状态或上下文,所以我们最终编写了一个在 S3 PUT
上触发的 AWS Lambda,它写入元数据(包括文件名和路径)进入文件本身。
我也补充一下我的经验,在我的例子中,我收到了一个空结果,因为调用了 cache()
方法。
例如:
import pyspark.sql.functions as F
df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())
df.show()
我收到
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
但是如果我删除行 df.cache()
,列 input_file_name
会正确显示输入文件名。
解决方法可能是在缓存之前调用 F.input_file_name()
。
我遇到了与 Vzzarr 相同的问题。当我在数据框上调用 cache
后创建 input_file_path
列时,文件路径为空。但是当我在调用 cache
之前创建 input_file_path
时,它起作用了。
我发现了另一个奇怪的行为。当我在 input_file_path
之前添加一个 limit()
时,id 也不起作用。
此代码的文件名为空
df_raw = (spark
.read
.option("delimiter", ";")
.csv(filepath,header = "true",inferSchema=True)
.select("COL1","COL2")
.limit(1000)
)
df_transform = (df_raw
.withColumn("filename", f.input_file_name())
)
此代码有效
df_raw = (spark
.read
.option("delimiter", ";")
.csv(filepath,header = "true",inferSchema=True)
.select("COL1","COL2")
)
df_transform = (df_raw
.withColumn("filename", f.input_file_name())
.limit(1000)
)
我花了一些时间才弄明白,因为我试图通过只读取几行来加快调试速度。
这件事发生在我身上,因为我的 databricks 集群 (7.6) 有一个已弃用的运行时,我在 7.3 LTS 上尝试了它,但效果很好。 :)