将 Spark 数据帧插入分区 table
Insert Spark dataframe to partitioned table
我看过插入 Hive table 的方法,例如 insertInto(table_name, overwrite =True
,但我不知道如何处理下面的场景。
对于第一个 运行,像这样的数据帧需要保存在 table 中,按 'date_key' 分区。可以有一个或多个分区,例如 202201
和 202203
+---+----------+
| id| date_key|
+---+----------+
| 1|202201 |
| 2|202203 |
| 3|202201 |
+---+----------+
后面的运行,数据也是这样进来的,我想把新的数据追加到它们对应的分区,使用date_key
+---+----------+
| id| date_key|
+---+----------+
| 4|202204 |
| 5|202203 |
| 6|202204 |
+---+----------+
能否请您帮忙说明一下如何处理
- 如果在每个运行期间只有一个分区的新数据
- 如果在每个 运行 期间会有来自多个分区的新数据,就像上面的示例输入一样?
非常感谢您的帮助。如果我能更好地解释问题,请告诉我。
已编辑:
我无法使用 df.write.partitionBy("date_key").insertInto(table_name)
,因为出现错误提示 insertInto
不能与 partitionBy
一起使用。
如果 table 是外部分区 table 您可以使用以下代码将数据写出到外部分区 table
df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")
如果它是由 table 管理的配置单元,那么您可以简单地使用 saveAsTable
API
如下
df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")
在我这里的示例中,首先 运行 将创建新的分区 table data
。 c2
是分区列。
df1 = spark.createDataFrame([
(1, 'a'),
(2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
其次运行,你不需要任何花哨的东西,只需要append
和insertInto
。 Spark 知道你有 c2
是分区列并且它会正确,你不必通过 partitionBy
,
告诉它
df2 = spark.createDataFrame([
(1, 'a'),
(3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=c
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
我看过插入 Hive table 的方法,例如 insertInto(table_name, overwrite =True
,但我不知道如何处理下面的场景。
对于第一个 运行,像这样的数据帧需要保存在 table 中,按 'date_key' 分区。可以有一个或多个分区,例如 202201
和 202203
+---+----------+
| id| date_key|
+---+----------+
| 1|202201 |
| 2|202203 |
| 3|202201 |
+---+----------+
后面的运行,数据也是这样进来的,我想把新的数据追加到它们对应的分区,使用date_key
+---+----------+
| id| date_key|
+---+----------+
| 4|202204 |
| 5|202203 |
| 6|202204 |
+---+----------+
能否请您帮忙说明一下如何处理
- 如果在每个运行期间只有一个分区的新数据
- 如果在每个 运行 期间会有来自多个分区的新数据,就像上面的示例输入一样?
非常感谢您的帮助。如果我能更好地解释问题,请告诉我。
已编辑:
我无法使用 df.write.partitionBy("date_key").insertInto(table_name)
,因为出现错误提示 insertInto
不能与 partitionBy
一起使用。
如果 table 是外部分区 table 您可以使用以下代码将数据写出到外部分区 table
df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")
如果它是由 table 管理的配置单元,那么您可以简单地使用 saveAsTable
API
如下
df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")
在我这里的示例中,首先 运行 将创建新的分区 table data
。 c2
是分区列。
df1 = spark.createDataFrame([
(1, 'a'),
(2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
其次运行,你不需要任何花哨的东西,只需要append
和insertInto
。 Spark 知道你有 c2
是分区列并且它会正确,你不必通过 partitionBy
,
df2 = spark.createDataFrame([
(1, 'a'),
(3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=c
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet