未能找到数据源:Python 环境中的增量
Failed to find data source: delta in Python environment
以下:https://docs.delta.io/latest/quick-start.html#python
我已经安装了 delta-spark 和 运行:
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()
然而当我运行:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
错误状态:增量未被识别
&如果我运行
DeltaTable.isDeltaTable(spark, "packages/tests/streaming/data")
它指出:类型错误:'JavaPackage' 对象不可调用
似乎我可以在本地 运行 这些命令(例如单元测试)而无需 Maven 或 运行在 pyspark shell 中使用它?最好看看我是否缺少依赖项?
您可以使用 pip install delta-spark
安装 delta-spark
PyPi 包(它也会拉取 pyspark),然后参考它。
或者您可以添加将获取 Delta 包的配置选项。是 .config("spark.jars.packages", "io.delta:delta-core_2.12:<delta-version>")
。对于 Spark 3.1,Delta 版本是 1.0.0(有关详细信息,请参阅 releases mapping docs)。
我有一个 example of using Delta tables in unit tests(请注意,导入语句在函数定义中,因为 Delta 包是动态加载的):
import pyspark
import pyspark.sql
import pytest
import shutil
from pyspark.sql import SparkSession
delta_dir_name = "/tmp/delta-table"
@pytest.fixture
def delta_setup(spark_session):
data = spark_session.range(0, 5)
data.write.format("delta").save(delta_dir_name)
yield data
shutil.rmtree(delta_dir_name, ignore_errors=True)
def test_delta(spark_session, delta_setup):
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark_session, delta_dir_name)
hist = deltaTable.history()
assert hist.count() == 1
环境是initialized via pytest-spark:
[pytest]
filterwarnings =
ignore::DeprecationWarning
spark_options =
spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.jars.packages: io.delta:delta-core_2.12:1.0.0
spark.sql.catalogImplementation: in-memory
以下:https://docs.delta.io/latest/quick-start.html#python
我已经安装了 delta-spark 和 运行:
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()
然而当我运行:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
错误状态:增量未被识别
&如果我运行
DeltaTable.isDeltaTable(spark, "packages/tests/streaming/data")
它指出:类型错误:'JavaPackage' 对象不可调用
似乎我可以在本地 运行 这些命令(例如单元测试)而无需 Maven 或 运行在 pyspark shell 中使用它?最好看看我是否缺少依赖项?
您可以使用 pip install delta-spark
安装 delta-spark
PyPi 包(它也会拉取 pyspark),然后参考它。
或者您可以添加将获取 Delta 包的配置选项。是 .config("spark.jars.packages", "io.delta:delta-core_2.12:<delta-version>")
。对于 Spark 3.1,Delta 版本是 1.0.0(有关详细信息,请参阅 releases mapping docs)。
我有一个 example of using Delta tables in unit tests(请注意,导入语句在函数定义中,因为 Delta 包是动态加载的):
import pyspark
import pyspark.sql
import pytest
import shutil
from pyspark.sql import SparkSession
delta_dir_name = "/tmp/delta-table"
@pytest.fixture
def delta_setup(spark_session):
data = spark_session.range(0, 5)
data.write.format("delta").save(delta_dir_name)
yield data
shutil.rmtree(delta_dir_name, ignore_errors=True)
def test_delta(spark_session, delta_setup):
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark_session, delta_dir_name)
hist = deltaTable.history()
assert hist.count() == 1
环境是initialized via pytest-spark:
[pytest]
filterwarnings =
ignore::DeprecationWarning
spark_options =
spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.jars.packages: io.delta:delta-core_2.12:1.0.0
spark.sql.catalogImplementation: in-memory