PySpark Delta Table - 生成符号链接 [java.lang.NoSuchMethodError]

PySpark Delta Table - generate symlink [java.lang.NoSuchMethodError]

我现在的情况:

为此,我需要遵循文档:instructions and s3 setup

我正在使用 MacBook ProEnvironment variables configured in my ~/.zshrc 作为我的小 POC:

export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1

我设置了一个小的 pyspark 项目,我在其中创建 spark_session:

from pyspark.sql import SparkSession
import findspark
import boto3


def create_session() -> SparkSession:
    findspark.init()

    spark_session = SparkSession.builder.appName("delta_session") \
        .master("local[*]") \
        .getOrCreate()

    sparkContext = spark_session.sparkContext

    boto_default_session = boto3.setup_default_session()

    boto_session = boto3.Session(
        botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
    )
    credentials = boto_session.get_credentials()

    print(
        f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
    )

    hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
    hadoopConfiguration.set(
        "fs.s3a.aws.credentials.provider", 
        "com.amazonaws.auth.profile.ProfileCredentialsProvider"
    )
    hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
    hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
    hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")

    return spark_session

然后我运行:

spark_session = create_session()

from delta.tables import *

delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")

# This works
df = delta_table.toDF()
print(df.show(10))

# This fails
delta_table.generate("symlink_format_manifest")
  1. 我能够检索增量文件并创建 DataFrame,一切看起来都不错。

  2. 然后我尝试调用 delta_table.generate,但出现此错误:

Traceback (most recent call last): File "/run.py", line 33, in delta_table.generate("symlink_format_manifest") File "/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 74, in generate File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call File "<poetry_python_path>/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o34.generate. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default()Z at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression(GenerateSymlinkManifest.scala:350) at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest(GenerateSymlinkManifest.scala:180) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration(GenerateSymlinkManifest.scala:365) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$adapted(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) at io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) at io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) at io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) at io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

我调用应用程序:

    poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py

我试过的:

py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"

在我看来 org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default()Z 由于某种原因不可调用。我找不到更多要安装的东西了吗?

我的pyproject.toml

[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]

[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"

[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"

非常感谢可能遇到相同问题的任何人。


更新

根据 Alex 的评论,我通过以下方式解决了这个问题:

您需要将 Spark 降级到 Spark 3.0.2 才能使用 Delta 0.8.0 - 不幸的是,Spark 3.1.1 对在后台使用 Delta 的内部事物进行了许多更改,这破坏了二进制兼容性。您的具体问题很可能是由 SPARK-32154 that made changes in the parameters of the ScalaUDF (this line)

引起的