如何使用 Spark 2.4.4 创建 TABLE 使用 delta?
How to CREATE TABLE USING delta with Spark 2.4.4?
这是 Spark 2.4.4 和 Delta Lake 0.5.0。
我正在尝试使用 delta 数据源创建 table,但我似乎遗漏了什么。虽然 CREATE TABLE USING delta
命令工作正常,但 table 目录没有创建,insertInto
也没有工作。
以下 CREATE TABLE USING delta
工作正常,但 insertInto
失败。
scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show
scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
| t5| default| null| EXTERNAL| false|
+----+--------+-----------+---------+-----------+
scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).;
at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:341)
...
我以为我会创建定义的列,但这也不起作用。
scala> sql("""
create table t6
(id LONG, name STRING)
USING delta
LOCATION '/tmp/delta'
""").show
org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan(Dataset.scala:194)
at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
... 54 elided
Delta 的 OSS 版本目前还没有 SQL 创建 Table 语法。这将在未来的版本中使用 Spark 3.0 实现。
要创建一个 Delta table,您必须以 Delta 格式写出一个 DataFrame。 Python 中的一个例子是
df.write.format("delta").save("/some/data/path")
这是 link Python、Scala 和 Java 的创建 table 文档。
tl;dr CREATE TABLE USING delta
3.0.0 之前的 Spark 和 0.7.0 之前的 Delta Lake 不支持。
带有 Spark 3.0.0 的 Delta Lake 0.7.0(均刚刚发布)支持 CREATE TABLE
SQL 命令。
务必使用 spark.sql.catalog.spark_catalog
配置 属性 和 org.apache.spark.sql.delta.catalog.DeltaCatalog
来“安装”Delta SQL。
$ ./bin/spark-submit \
--packages io.delta:delta-core_2.12:0.7.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
scala> spark.version
res0: String = 3.0.0
scala> sql("CREATE TABLE delta_101 (id LONG) USING delta").show
++
||
++
++
scala> spark.table("delta_101").show
+---+
| id|
+---+
+---+
scala> sql("DESCRIBE EXTENDED delta_101").show(truncate = false)
+----------------------------+---------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+---------------------------------------------------------+-------+
|id |bigint | |
| | | |
|# Partitioning | | |
|Not partitioned | | |
| | | |
|# Detailed Table Information| | |
|Name |default.delta_101 | |
|Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/delta_101| |
|Provider |delta | |
|Table Properties |[] | |
+----------------------------+---------------------------------------------------------+-------+
pyspark 3.0.0 和 delta 0.7.0 的示例
print(f"LOCATION '{location}")
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
CD_DEVICE INT,
FC_LOCAL_TIME TIMESTAMP,
CD_TYPE_DEVICE STRING,
CONSUMTION DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, FC_LOCAL_TIME)
LOCATION '{location}'
""")
其中“location”是用于 spark 集群模式保存 de delta 的 HDFS 目录 table。
这是 Spark 2.4.4 和 Delta Lake 0.5.0。
我正在尝试使用 delta 数据源创建 table,但我似乎遗漏了什么。虽然 CREATE TABLE USING delta
命令工作正常,但 table 目录没有创建,insertInto
也没有工作。
以下 CREATE TABLE USING delta
工作正常,但 insertInto
失败。
scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show
scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
| t5| default| null| EXTERNAL| false|
+----+--------+-----------+---------+-----------+
scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).;
at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:341)
...
我以为我会创建定义的列,但这也不起作用。
scala> sql("""
create table t6
(id LONG, name STRING)
USING delta
LOCATION '/tmp/delta'
""").show
org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan(Dataset.scala:194)
at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
... 54 elided
Delta 的 OSS 版本目前还没有 SQL 创建 Table 语法。这将在未来的版本中使用 Spark 3.0 实现。
要创建一个 Delta table,您必须以 Delta 格式写出一个 DataFrame。 Python 中的一个例子是
df.write.format("delta").save("/some/data/path")
这是 link Python、Scala 和 Java 的创建 table 文档。
tl;dr CREATE TABLE USING delta
3.0.0 之前的 Spark 和 0.7.0 之前的 Delta Lake 不支持。
带有 Spark 3.0.0 的 Delta Lake 0.7.0(均刚刚发布)支持 CREATE TABLE
SQL 命令。
务必使用 spark.sql.catalog.spark_catalog
配置 属性 和 org.apache.spark.sql.delta.catalog.DeltaCatalog
来“安装”Delta SQL。
$ ./bin/spark-submit \
--packages io.delta:delta-core_2.12:0.7.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
scala> spark.version
res0: String = 3.0.0
scala> sql("CREATE TABLE delta_101 (id LONG) USING delta").show
++
||
++
++
scala> spark.table("delta_101").show
+---+
| id|
+---+
+---+
scala> sql("DESCRIBE EXTENDED delta_101").show(truncate = false)
+----------------------------+---------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+---------------------------------------------------------+-------+
|id |bigint | |
| | | |
|# Partitioning | | |
|Not partitioned | | |
| | | |
|# Detailed Table Information| | |
|Name |default.delta_101 | |
|Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/delta_101| |
|Provider |delta | |
|Table Properties |[] | |
+----------------------------+---------------------------------------------------------+-------+
pyspark 3.0.0 和 delta 0.7.0 的示例
print(f"LOCATION '{location}")
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
CD_DEVICE INT,
FC_LOCAL_TIME TIMESTAMP,
CD_TYPE_DEVICE STRING,
CONSUMTION DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, FC_LOCAL_TIME)
LOCATION '{location}'
""")
其中“location”是用于 spark 集群模式保存 de delta 的 HDFS 目录 table。