pyspark delta lake 优化 - 无法解析 SQL
pyspark delta lake optimize - fails to parse SQL
我使用 spark 3.x 和 delta 0 创建了一个 delta table。7.x:
data = spark.range(0, 5)
data.write.format("delta").mode("overwrite").save("tmp/delta-table")
# add some more files
data = spark.range(20, 100)
data.write.format("delta").mode("append").save("tmp/delta-table")
df = spark.read.format("delta").load("tmp/delta-table")
df.show()
现在日志中生成了相当多的文件(parquet 文件太小了)。
%ls tmp/delta-table
我想压缩它们:
df.createGlobalTempView("my_delta_table")
spark.sql("OPTIMIZE my_delta_table ZORDER BY (id)")
失败:
ParseException:
mismatched input 'OPTIMIZE' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)
== SQL ==
OPTIMIZE my_delta_table ZORDER BY (id)
^^^
问题:
- 如何在查询失败的情况下使其工作(优化)
- 有没有比调用基于文本的SQL更原生的API?
通知:
spark is started like this:
import pyspark
from pyspark.sql import SparkSession
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
from delta.tables import *
OPTIMIZE
在 OSS Delta Lake 中不可用。如果您想压缩文件,可以按照 Compact files 部分中的说明进行操作。如果您想使用 ZORDER
,目前您需要使用 Databricks Runtime。
-- 编辑--
如果您在本地 运行 Delta,则意味着您必须使用 OSS Delta Lake。优化命令仅适用于 Databricks Delta Lake。在 OSS 中做文件压缩,你可以这样做 - https://docs.delta.io/latest/best-practices.html#compact-files
我使用 spark 3.x 和 delta 0 创建了一个 delta table。7.x:
data = spark.range(0, 5)
data.write.format("delta").mode("overwrite").save("tmp/delta-table")
# add some more files
data = spark.range(20, 100)
data.write.format("delta").mode("append").save("tmp/delta-table")
df = spark.read.format("delta").load("tmp/delta-table")
df.show()
现在日志中生成了相当多的文件(parquet 文件太小了)。
%ls tmp/delta-table
我想压缩它们:
df.createGlobalTempView("my_delta_table")
spark.sql("OPTIMIZE my_delta_table ZORDER BY (id)")
失败:
ParseException:
mismatched input 'OPTIMIZE' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)
== SQL ==
OPTIMIZE my_delta_table ZORDER BY (id)
^^^
问题:
- 如何在查询失败的情况下使其工作(优化)
- 有没有比调用基于文本的SQL更原生的API?
通知:
spark is started like this:
import pyspark
from pyspark.sql import SparkSession
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
from delta.tables import *
OPTIMIZE
在 OSS Delta Lake 中不可用。如果您想压缩文件,可以按照 Compact files 部分中的说明进行操作。如果您想使用 ZORDER
,目前您需要使用 Databricks Runtime。
-- 编辑--
如果您在本地 运行 Delta,则意味着您必须使用 OSS Delta Lake。优化命令仅适用于 Databricks Delta Lake。在 OSS 中做文件压缩,你可以这样做 - https://docs.delta.io/latest/best-practices.html#compact-files