运行 Pytest 与 DeltaTables 时出错

Error when running Pytest with DeltaTables

我在一家公司的 VDI 工作,出于安全原因,他们使用自己的人工制品。 目前,我正在编写单元测试来对从增量 table 中删除条目的函数执行测试。当我开始时,我收到了未解析依赖项的错误,因为我的 spark 会话配置为从 maven 加载 jar。我能够通过从 /opt/spark/jars 在本地加载这些 jar 来解决这个问题。现在我的代码如下所示:

class TestTransformation(unittest.TestCase):
    @classmethod
    def test_ksu_deletion(self):
        self.spark = SparkSession.builder\
                        .appName('SPARK_DELETION')\
                        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
                        .config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
                        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
                        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
                        .getOrCreate()
        os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
        deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
        deltatable.delete(col("DATE") < get_current()

但是,我收到错误消息:

E     py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E     : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V

你知道这是什么原因造成的吗?我假设它与我配置 spark.sql.extions and/or spark.sql.catalog 的方式有关,但老实说,我是 Spark 的新手。 我将不胜感激任何提示。

非常感谢!

编辑: 我们正在使用 Spark 3.0.2 (Scala 2.12.10)。根据https://docs.delta.io/latest/releases.html,这应该是兼容的。除了 SparkSession,我将后续代码缩减为

df.spark.read.parquet(Path/to/file.snappy.parquet)

现在我收到了错误消息

java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class

正如我所说,我对 (Py)Spark 很陌生,所以请不要犹豫提及您认为完全显而易见的事情。

编辑 2:我在 运行 代码之前检查了我在 Shell 中导出的 Python 路径,我可以看到以下内容: 这会导致任何问题吗?我不明白为什么当 运行 pipenv 中的代码(使用 spark-submit)

时我没有收到此错误

您使用的 Delta lake 库版本似乎不兼容。 0.7.0 适用于 Spark 3.0,但您使用的是另一个版本 - 更低或更高。查阅 Delta releases page 以查找 Delta 版本与所需 Spark 版本之间的映射。

如果您使用的是 Spark 3.1 或 3.2,请考虑使用 delta-spark Python 包来安装所有必要的依赖项,因此您只需导入 DeltaTable class。

更新:是的,这是因为版本冲突 - 您需要删除 delta-sparkpyspark Python 包,并明确安装 pyspark==3.0.2

P.S。另外,看看 pytest-spark package that can simplify specification of configuration for all tests. You can find examples of it + Delta here.