如何在 pyspark 数据帧读取方法中包含分区列
How to include partitioned column in pyspark dataframe read method
我正在从 parquet 文件编写基于 Avro 文件的文件。我已阅读如下文件:
读取数据
dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
.load("/Users/rashmik/flight-time.parquet")
写入数据
我已经用 Avro 格式编写了如下文件:
dfParquetRePartitioned.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()
正如预期的那样,我得到了按 OP_CARRIER
分区的数据。
从特定分区读取 Avro 分区数据
在另一项工作中,我需要从上述工作的输出中读取数据,即从 datasink/avro
目录中读取数据。我正在使用以下代码读取 datasink/avro
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load("datasink/avro/OP_CARRIER=AA")
它成功读取数据,但正如预期的那样,OP_CARRIER
列在 dfAvro
数据框中不可用,因为它是第一个作业的分区列。现在我的要求是在第二个数据帧中也包括 OP_CARRIER
字段,即在 dfAvro
中。有人可以帮我解决这个问题吗?
我指的是来自 spark document 的文档,但我无法找到相关信息。任何指针都会非常有帮助。
您使用不同的别名复制相同的列值。
dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) \
.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()
这会给你想要的。但别名不同。
或者你也可以在阅读时做。如果位置是动态的,那么您可以轻松地追加该列。
path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=")
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load(path).withColumn(newcol[0], lit(newcol[1]))
如果该值是静态的,则在读取数据期间更容易添加它。
我正在从 parquet 文件编写基于 Avro 文件的文件。我已阅读如下文件:
读取数据
dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
.load("/Users/rashmik/flight-time.parquet")
写入数据
我已经用 Avro 格式编写了如下文件:
dfParquetRePartitioned.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()
正如预期的那样,我得到了按 OP_CARRIER
分区的数据。
从特定分区读取 Avro 分区数据
在另一项工作中,我需要从上述工作的输出中读取数据,即从 datasink/avro
目录中读取数据。我正在使用以下代码读取 datasink/avro
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load("datasink/avro/OP_CARRIER=AA")
它成功读取数据,但正如预期的那样,OP_CARRIER
列在 dfAvro
数据框中不可用,因为它是第一个作业的分区列。现在我的要求是在第二个数据帧中也包括 OP_CARRIER
字段,即在 dfAvro
中。有人可以帮我解决这个问题吗?
我指的是来自 spark document 的文档,但我无法找到相关信息。任何指针都会非常有帮助。
您使用不同的别名复制相同的列值。
dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) \
.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()
这会给你想要的。但别名不同。 或者你也可以在阅读时做。如果位置是动态的,那么您可以轻松地追加该列。
path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=")
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load(path).withColumn(newcol[0], lit(newcol[1]))
如果该值是静态的,则在读取数据期间更容易添加它。