运行 Pytest 与 DeltaTables 时出错
Error when running Pytest with DeltaTables
我在一家公司的 VDI 工作,出于安全原因,他们使用自己的人工制品。
目前,我正在编写单元测试来对从增量 table 中删除条目的函数执行测试。当我开始时,我收到了未解析依赖项的错误,因为我的 spark 会话配置为从 maven 加载 jar。我能够通过从 /opt/spark/jars 在本地加载这些 jar 来解决这个问题。现在我的代码如下所示:
class TestTransformation(unittest.TestCase):
@classmethod
def test_ksu_deletion(self):
self.spark = SparkSession.builder\
.appName('SPARK_DELETION')\
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
.config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
deltatable.delete(col("DATE") < get_current()
但是,我收到错误消息:
E py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
你知道这是什么原因造成的吗?我假设它与我配置 spark.sql.extions and/or spark.sql.catalog 的方式有关,但老实说,我是 Spark 的新手。
我将不胜感激任何提示。
非常感谢!
编辑:
我们正在使用 Spark 3.0.2 (Scala 2.12.10)。根据https://docs.delta.io/latest/releases.html,这应该是兼容的。除了 SparkSession,我将后续代码缩减为
df.spark.read.parquet(Path/to/file.snappy.parquet)
现在我收到了错误消息
java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
正如我所说,我对 (Py)Spark 很陌生,所以请不要犹豫提及您认为完全显而易见的事情。
编辑 2:我在 运行 代码之前检查了我在 Shell 中导出的 Python 路径,我可以看到以下内容:
这会导致任何问题吗?我不明白为什么当 运行 pipenv 中的代码(使用 spark-submit)
时我没有收到此错误
您使用的 Delta lake 库版本似乎不兼容。 0.7.0 适用于 Spark 3.0,但您使用的是另一个版本 - 更低或更高。查阅 Delta releases page 以查找 Delta 版本与所需 Spark 版本之间的映射。
如果您使用的是 Spark 3.1 或 3.2,请考虑使用 delta-spark Python 包来安装所有必要的依赖项,因此您只需导入 DeltaTable
class。
更新:是的,这是因为版本冲突 - 您需要删除 delta-spark
和 pyspark
Python 包,并明确安装 pyspark==3.0.2
。
P.S。另外,看看 pytest-spark package that can simplify specification of configuration for all tests. You can find examples of it + Delta here.
我在一家公司的 VDI 工作,出于安全原因,他们使用自己的人工制品。 目前,我正在编写单元测试来对从增量 table 中删除条目的函数执行测试。当我开始时,我收到了未解析依赖项的错误,因为我的 spark 会话配置为从 maven 加载 jar。我能够通过从 /opt/spark/jars 在本地加载这些 jar 来解决这个问题。现在我的代码如下所示:
class TestTransformation(unittest.TestCase):
@classmethod
def test_ksu_deletion(self):
self.spark = SparkSession.builder\
.appName('SPARK_DELETION')\
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
.config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
deltatable.delete(col("DATE") < get_current()
但是,我收到错误消息:
E py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
你知道这是什么原因造成的吗?我假设它与我配置 spark.sql.extions and/or spark.sql.catalog 的方式有关,但老实说,我是 Spark 的新手。 我将不胜感激任何提示。
非常感谢!
编辑: 我们正在使用 Spark 3.0.2 (Scala 2.12.10)。根据https://docs.delta.io/latest/releases.html,这应该是兼容的。除了 SparkSession,我将后续代码缩减为
df.spark.read.parquet(Path/to/file.snappy.parquet)
现在我收到了错误消息
java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
正如我所说,我对 (Py)Spark 很陌生,所以请不要犹豫提及您认为完全显而易见的事情。
编辑 2:我在 运行 代码之前检查了我在 Shell 中导出的 Python 路径,我可以看到以下内容:
您使用的 Delta lake 库版本似乎不兼容。 0.7.0 适用于 Spark 3.0,但您使用的是另一个版本 - 更低或更高。查阅 Delta releases page 以查找 Delta 版本与所需 Spark 版本之间的映射。
如果您使用的是 Spark 3.1 或 3.2,请考虑使用 delta-spark Python 包来安装所有必要的依赖项,因此您只需导入 DeltaTable
class。
更新:是的,这是因为版本冲突 - 您需要删除 delta-spark
和 pyspark
Python 包,并明确安装 pyspark==3.0.2
。
P.S。另外,看看 pytest-spark package that can simplify specification of configuration for all tests. You can find examples of it + Delta here.