使用文件名创建变量 - PySpark
Using filenames to create variable - PySpark
我有一个文件被删除的文件夹(每天、每周),我需要添加年份和 week/day,它们在文件名中以一致的格式作为变量添加到我的数据框中。前缀可以更改(例如 sales_report
、cash_flow
等),但最后一个字符始终为 YYYY_WW.csv
。
例如,对于每周文件,我可以为每个文件手动执行:
from pyspark.sql.functions import lit
df = spark.read.load('my_folder/sales_report_2019_12.csv', format="csv").withColumn("sales_year", lit(2019)).withColumn("sales_week", lit(12))
我想使用从文件名右侧开始计数的子字符串函数来解析 12
和 2019
。如果我能够解析这些变量的文件名,那么我就可以使用 df = spark.read.load('my_folder/sales_report_*.csv', format="csv")
等通配符读取文件夹中的所有文件,这将大大简化我的代码。
您可以尝试以下方法:
import glob
listfiles = glob.glob('my_folder/sales_report_*.csv')
for files in listfiles:
weekyear = c.split('_',2)[-1].split('_')
week = weekyear[1]
year = weekyear[0]
df = spark.read.load('files', format="csv").withColumn("sales_year", lit(year)).withColumn("sales_week", lit(week))
您可以使用 input_file_name()
列和一些字符串函数(如 regexp_extract
和 substring_index
:
轻松地从文件名中提取它
df = spark.read.load('my_folder/*.csv', format="csv")
df = df.withColumn("year_week", regexp_extract(input_file_name(), "\d{4}_\d{1,2}"))\
.withColumn("sales_year", substring_index(col("year_week"), "_", 1))\
.withColumn("sales_week", substring_index(col("year_week"), "_", -1))\
.drop("year_week")
我有一个文件被删除的文件夹(每天、每周),我需要添加年份和 week/day,它们在文件名中以一致的格式作为变量添加到我的数据框中。前缀可以更改(例如 sales_report
、cash_flow
等),但最后一个字符始终为 YYYY_WW.csv
。
例如,对于每周文件,我可以为每个文件手动执行:
from pyspark.sql.functions import lit
df = spark.read.load('my_folder/sales_report_2019_12.csv', format="csv").withColumn("sales_year", lit(2019)).withColumn("sales_week", lit(12))
我想使用从文件名右侧开始计数的子字符串函数来解析 12
和 2019
。如果我能够解析这些变量的文件名,那么我就可以使用 df = spark.read.load('my_folder/sales_report_*.csv', format="csv")
等通配符读取文件夹中的所有文件,这将大大简化我的代码。
您可以尝试以下方法:
import glob
listfiles = glob.glob('my_folder/sales_report_*.csv')
for files in listfiles:
weekyear = c.split('_',2)[-1].split('_')
week = weekyear[1]
year = weekyear[0]
df = spark.read.load('files', format="csv").withColumn("sales_year", lit(year)).withColumn("sales_week", lit(week))
您可以使用 input_file_name()
列和一些字符串函数(如 regexp_extract
和 substring_index
:
df = spark.read.load('my_folder/*.csv', format="csv")
df = df.withColumn("year_week", regexp_extract(input_file_name(), "\d{4}_\d{1,2}"))\
.withColumn("sales_year", substring_index(col("year_week"), "_", 1))\
.withColumn("sales_week", substring_index(col("year_week"), "_", -1))\
.drop("year_week")