在 PySpark 中获取序列文件格式文件的 HDFS 文件路径
Get HDFS file path in PySpark for files in sequence file format
我在 HDFS 上的数据是 Sequence 文件格式。我正在使用 PySpark (Spark 1.6) 并试图实现两件事:
数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身。我试过 SparkContext.wholeTextFiles 但我认为它可能不支持序列文件格式。
如果我想处理一天的数据并想将日期带入数据中,我该如何处理上述问题?在这种情况下,我将加载像 yyyy/mm/dd/* 格式的数据。
感谢任何指点。
如果存储的类型与 SQL 类型兼容并且您使用 Spark 2.0,那将非常简单。导入 input_file_name
:
from pyspark.sql.functions import input_file_name
读取文件并转换为 DataFrame
:
df = sc.sequenceFile("/tmp/foo/").toDF()
添加文件名:
df.withColumn("input", input_file_name())
如果此解决方案不适用于您的情况,那么通用的解决方案是直接列出文件(对于 HDFS,您可以使用 hdfs3
库):
files = ...
一一阅读添加文件名:
def read(f):
"""Just to avoid problems with late binding"""
return sc.sequenceFile(f).map(lambda x: (f, x))
rdds = [read(f) for f in files]
联合:
sc.union(rdds)
我在 HDFS 上的数据是 Sequence 文件格式。我正在使用 PySpark (Spark 1.6) 并试图实现两件事:
数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身。我试过 SparkContext.wholeTextFiles 但我认为它可能不支持序列文件格式。
如果我想处理一天的数据并想将日期带入数据中,我该如何处理上述问题?在这种情况下,我将加载像 yyyy/mm/dd/* 格式的数据。
感谢任何指点。
如果存储的类型与 SQL 类型兼容并且您使用 Spark 2.0,那将非常简单。导入 input_file_name
:
from pyspark.sql.functions import input_file_name
读取文件并转换为 DataFrame
:
df = sc.sequenceFile("/tmp/foo/").toDF()
添加文件名:
df.withColumn("input", input_file_name())
如果此解决方案不适用于您的情况,那么通用的解决方案是直接列出文件(对于 HDFS,您可以使用 hdfs3
库):
files = ...
一一阅读添加文件名:
def read(f):
"""Just to avoid problems with late binding"""
return sc.sequenceFile(f).map(lambda x: (f, x))
rdds = [read(f) for f in files]
联合:
sc.union(rdds)