带有配置单元的 pyspark - 无法正确创建分区并从数据框中保存 table
pyspark with hive - can't properly create with partition and save a table from a dataframe
我正在尝试通过很少的转换(添加日期)将 json 文件转换为 parquet,但我需要先对这些数据进行分区,然后再将其保存到 parquet。
我在这方面碰壁了。
下面是table的创建过程:
df_temp = spark.read.json(data_location) \
.filter(
cond3
)
df_temp = df_temp.withColumn("date", fn.to_date(fn.lit(today.strftime("%Y-%m-%d"))))
df_temp.createOrReplaceTempView("{}_tmp".format("duration_small"))
spark.sql("CREATE TABLE IF NOT EXISTS {1} LIKE {0}_tmp LOCATION '{2}/{1}'".format("duration_small","duration", warehouse_location))
spark.sql("DESC {}".format("duration"))
然后关于转换的保存:
df_final.write.mode("append").format("parquet").partitionBy("customer_id", "date").saveAsTable('duration')
但这会产生以下错误:
pyspark.sql.utils.AnalysisException: '\nSpecified partitioning does not match that of the existing table default.duration.\nSpecified partition columns: [customer_id, date]\nExisting partition columns: []\n ;'
架构是:
root
|-- action_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- duration: long (nullable = true)
|-- initial_value: string (nullable = true)
|-- item_class: string (nullable = true)
|-- set_value: string (nullable = true)
|-- start_time: string (nullable = true)
|-- stop_time: string (nullable = true)
|-- undo_event: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- date: date (nullable = true)
因此我尝试将 create table 更改为:
spark.sql("CREATE TABLE IF NOT EXISTS {1} LIKE {0}_tmp PARTITIONED BY (customer_id, date) LOCATION '{2}/{1}'".format("duration_small","duration", warehouse_location))
但这会产生如下错误:
...mismatched input 'PARTITIONED' expecting ...
所以我发现 PARTITIONED BY 不适用于 LIKE
但我 运行 没有想法。
如果使用 USING
而不是 LIKE
我收到错误:
pyspark.sql.utils.AnalysisException: 'It is not allowed to specify partition columns when the table schema is not defined. When the table schema is not provided, schema and partition columns will be inferred.;'
创建table时如何添加分区?
Ps - 一旦用分区定义了 table 的模式,我想简单地使用:
df_final.write.format("parquet").insertInto('duration')
我终于想出了如何用 spark 做到这一点。
df_temp.read.json...
df_temp.createOrReplaceTempView("{}_tmp".format("duration_small"))
spark.sql("""
CREATE TABLE IF NOT EXISTS {1}
USING PARQUET
PARTITIONED BY (customer_id, date)
LOCATION '{2}/{1}' AS SELECT * FROM {0}_tmp
""".format("duration_small","duration", warehouse_location))
spark.sql("DESC {}".format("duration"))
df_temp.write.mode("append").partitionBy("customer_id", "date").saveAsTable('duration')
我不知道为什么,但如果我不能使用 insertInto,它会莫名其妙地使用一个奇怪的 customer_id,并且不会附加不同的日期。
我正在尝试通过很少的转换(添加日期)将 json 文件转换为 parquet,但我需要先对这些数据进行分区,然后再将其保存到 parquet。
我在这方面碰壁了。
下面是table的创建过程:
df_temp = spark.read.json(data_location) \
.filter(
cond3
)
df_temp = df_temp.withColumn("date", fn.to_date(fn.lit(today.strftime("%Y-%m-%d"))))
df_temp.createOrReplaceTempView("{}_tmp".format("duration_small"))
spark.sql("CREATE TABLE IF NOT EXISTS {1} LIKE {0}_tmp LOCATION '{2}/{1}'".format("duration_small","duration", warehouse_location))
spark.sql("DESC {}".format("duration"))
然后关于转换的保存:
df_final.write.mode("append").format("parquet").partitionBy("customer_id", "date").saveAsTable('duration')
但这会产生以下错误:
pyspark.sql.utils.AnalysisException: '\nSpecified partitioning does not match that of the existing table default.duration.\nSpecified partition columns: [customer_id, date]\nExisting partition columns: []\n ;'
架构是:
root
|-- action_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- duration: long (nullable = true)
|-- initial_value: string (nullable = true)
|-- item_class: string (nullable = true)
|-- set_value: string (nullable = true)
|-- start_time: string (nullable = true)
|-- stop_time: string (nullable = true)
|-- undo_event: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- date: date (nullable = true)
因此我尝试将 create table 更改为:
spark.sql("CREATE TABLE IF NOT EXISTS {1} LIKE {0}_tmp PARTITIONED BY (customer_id, date) LOCATION '{2}/{1}'".format("duration_small","duration", warehouse_location))
但这会产生如下错误:
...mismatched input 'PARTITIONED' expecting ...
所以我发现 PARTITIONED BY 不适用于 LIKE
但我 运行 没有想法。
如果使用 USING
而不是 LIKE
我收到错误:
pyspark.sql.utils.AnalysisException: 'It is not allowed to specify partition columns when the table schema is not defined. When the table schema is not provided, schema and partition columns will be inferred.;'
创建table时如何添加分区?
Ps - 一旦用分区定义了 table 的模式,我想简单地使用:
df_final.write.format("parquet").insertInto('duration')
我终于想出了如何用 spark 做到这一点。
df_temp.read.json...
df_temp.createOrReplaceTempView("{}_tmp".format("duration_small"))
spark.sql("""
CREATE TABLE IF NOT EXISTS {1}
USING PARQUET
PARTITIONED BY (customer_id, date)
LOCATION '{2}/{1}' AS SELECT * FROM {0}_tmp
""".format("duration_small","duration", warehouse_location))
spark.sql("DESC {}".format("duration"))
df_temp.write.mode("append").partitionBy("customer_id", "date").saveAsTable('duration')
我不知道为什么,但如果我不能使用 insertInto,它会莫名其妙地使用一个奇怪的 customer_id,并且不会附加不同的日期。