PySpark Delta Table - 生成符号链接 [java.lang.NoSuchMethodError]
PySpark Delta Table - generate symlink [java.lang.NoSuchMethodError]
我现在的情况:
- Delta table 位于
S3
- 我想通过
Athena
查询这个table
spark
版本 3.1.1
和 hadoop
3.2.0
为此,我需要遵循文档:instructions and s3 setup
我正在使用 MacBook Pro
和 Environment variables configured in my ~/.zshrc
作为我的小 POC
:
export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1
我设置了一个小的 pyspark
项目,我在其中创建 spark_session
:
from pyspark.sql import SparkSession
import findspark
import boto3
def create_session() -> SparkSession:
findspark.init()
spark_session = SparkSession.builder.appName("delta_session") \
.master("local[*]") \
.getOrCreate()
sparkContext = spark_session.sparkContext
boto_default_session = boto3.setup_default_session()
boto_session = boto3.Session(
botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
)
credentials = boto_session.get_credentials()
print(
f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
)
hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
hadoopConfiguration.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.profile.ProfileCredentialsProvider"
)
hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
return spark_session
然后我运行:
spark_session = create_session()
from delta.tables import *
delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")
# This works
df = delta_table.toDF()
print(df.show(10))
# This fails
delta_table.generate("symlink_format_manifest")
我能够检索增量文件并创建 DataFrame
,一切看起来都不错。
然后我尝试调用 delta_table.generate
,但出现此错误:
Traceback (most recent call last):
File "/run.py", line 33, in
delta_table.generate("symlink_format_manifest")
File "/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 74, in generate
File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call
File "<poetry_python_path>/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.generate.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default()Z
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression(GenerateSymlinkManifest.scala:350)
at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest(GenerateSymlinkManifest.scala:180)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration(GenerateSymlinkManifest.scala:365)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165)
at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41)
at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc(DeltaGenerateCommand.scala:58)
at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$adapted(DeltaGenerateCommand.scala:58)
at org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50)
at io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54)
at io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48)
at io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45)
at io.delta.tables.DeltaTable.generate(DeltaTable.scala:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
我调用应用程序:
poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py
我试过的:
- 我试过 运行在没有
poetry
的情况下直接下载 spark 并这样做
- 我尝试使用旧的
hadoop
版本,因为他们似乎使用那个 here
- 我找到了这个 thread 但它对我没有帮助
- 我也试过了
io.delta:delta-core_2.12:0.8.0
- 我已经验证
delta
版本 0.7.0
和 0.8.0
应该支持 spark 3.1.1
- 我还尝试添加
pyarrow
并通过以下方式进行设置:spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
- 我也试过添加 hadoop-common 3.2.0
--packages org.apache.hadoop:hadoop-common:3.2.0
,但这也没有帮助
- 我也试过 运行 将它与
spark 3.1.1 and hadoop 3.2.0
结合,但我给了它 --packages "io.delta:delta-core_2.12:0.7.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:2.7.7"
,但那给了我错误:
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"
在我看来 org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default()Z
由于某种原因不可调用。我找不到更多要安装的东西了吗?
我的pyproject.toml
[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]
[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"
[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"
非常感谢可能遇到相同问题的任何人。
更新
根据 Alex 的评论,我通过以下方式解决了这个问题:
- Spark 版本
3.0.2
- Hadoop 版本
3.2.0
- 增量
0.8.0
spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ~/code/dataops-delta-infrastructure/run.py
您需要将 Spark 降级到 Spark 3.0.2 才能使用 Delta 0.8.0 - 不幸的是,Spark 3.1.1 对在后台使用 Delta 的内部事物进行了许多更改,这破坏了二进制兼容性。您的具体问题很可能是由 SPARK-32154 that made changes in the parameters of the ScalaUDF
(this line)
引起的
我现在的情况:
- Delta table 位于
S3
- 我想通过
Athena
查询这个table
spark
版本3.1.1
和hadoop
3.2.0
为此,我需要遵循文档:instructions and s3 setup
我正在使用 MacBook Pro
和 Environment variables configured in my ~/.zshrc
作为我的小 POC
:
export PYSPARK_PYTHON=<poetry_python_path>/bin/python3
export PYSPARK_DRIVER=<poetry_python_path>/bin/python3
export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"
export SPARK_HOME=<poetry_python_path>/site-packages/pyspark
export PYARROW_IGNORE_TIMEZONE=1
我设置了一个小的 pyspark
项目,我在其中创建 spark_session
:
from pyspark.sql import SparkSession
import findspark
import boto3
def create_session() -> SparkSession:
findspark.init()
spark_session = SparkSession.builder.appName("delta_session") \
.master("local[*]") \
.getOrCreate()
sparkContext = spark_session.sparkContext
boto_default_session = boto3.setup_default_session()
boto_session = boto3.Session(
botocore_session=boto_default_session, profile_name="dev", region_name="eu-west-1"
)
credentials = boto_session.get_credentials()
print(
f"Hadoop version = {sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}"
)
hadoopConfiguration = sparkContext._jsc.hadoopConfiguration()
hadoopConfiguration.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.profile.ProfileCredentialsProvider"
)
hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConfiguration.set("fs.s3a.awsAccessKeyId", credentials.access_key)
hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", credentials.secret_key)
hadoopConfiguration.set("fs.s3a.endpoint", "s3.amazonaws.com")
return spark_session
然后我运行:
spark_session = create_session()
from delta.tables import *
delta_table = DeltaTable.forPath(spark_session, "s3a://<my-path-to-delta-table>")
# This works
df = delta_table.toDF()
print(df.show(10))
# This fails
delta_table.generate("symlink_format_manifest")
我能够检索增量文件并创建
DataFrame
,一切看起来都不错。然后我尝试调用
delta_table.generate
,但出现此错误:
Traceback (most recent call last): File "/run.py", line 33, in delta_table.generate("symlink_format_manifest") File "/private/var/folders/c8/sj3rz_k14cs58nqwr3m9zsxc0000gq/T/spark-ba2ce53e-c9f8-49d4-98d5-21d9581b05f4/userFiles-b6d820f0-4e96-4e27-8808-a14b9e93928a/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 74, in generate File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call File "<poetry_python_path>/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "<poetry_python_path>/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o34.generate. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default()Z at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generatePartitionPathExpression(GenerateSymlinkManifest.scala:350) at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression(GenerateSymlinkManifest.scala:349) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generatePartitionPathExpression$(GenerateSymlinkManifest.scala:345) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generatePartitionPathExpression(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.withRelativePartitionDir(GenerateSymlinkManifest.scala:338) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.writeManifestFiles(GenerateSymlinkManifest.scala:262) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$generateFullManifest(GenerateSymlinkManifest.scala:180) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.withStatusCode(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.$anonfun$recordManifestGeneration(GenerateSymlinkManifest.scala:365) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.recordDeltaOperation(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.recordManifestGeneration(GenerateSymlinkManifest.scala:364) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest(GenerateSymlinkManifest.scala:167) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifestImpl.generateFullManifest$(GenerateSymlinkManifest.scala:165) at org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest$.generateFullManifest(GenerateSymlinkManifest.scala:41) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand$.$anonfun$modeNameToGenerationFunc$adapted(DeltaGenerateCommand.scala:58) at org.apache.spark.sql.delta.commands.DeltaGenerateCommand.run(DeltaGenerateCommand.scala:50) at io.delta.tables.execution.DeltaTableOperations.executeGenerate(DeltaTableOperations.scala:54) at io.delta.tables.execution.DeltaTableOperations.executeGenerate$(DeltaTableOperations.scala:48) at io.delta.tables.DeltaTable.executeGenerate(DeltaTable.scala:45) at io.delta.tables.DeltaTable.generate(DeltaTable.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
我调用应用程序:
poetry run spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" run.py
我试过的:
- 我试过 运行在没有
poetry
的情况下直接下载 spark 并这样做 - 我尝试使用旧的
hadoop
版本,因为他们似乎使用那个 here - 我找到了这个 thread 但它对我没有帮助
- 我也试过了
io.delta:delta-core_2.12:0.8.0
- 我已经验证
delta
版本0.7.0
和0.8.0
应该支持spark 3.1.1
- 我还尝试添加
pyarrow
并通过以下方式进行设置:spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
- 我也试过添加 hadoop-common 3.2.0
--packages org.apache.hadoop:hadoop-common:3.2.0
,但这也没有帮助 - 我也试过 运行 将它与
spark 3.1.1 and hadoop 3.2.0
结合,但我给了它--packages "io.delta:delta-core_2.12:0.7.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:2.7.7"
,但那给了我错误:
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.lang.NumberFormatException: For input string: "100M"
在我看来 org.apache.spark.sql.catalyst.expressions.ScalaUDF$.apply$default()Z
由于某种原因不可调用。我找不到更多要安装的东西了吗?
我的pyproject.toml
[tool.poetry]
name = "..."
version = "1.0.0"
description = "..."
authors = ["..."]
[tool.poetry.dependencies]
python = "3.7.8"
pre-commit = "^2.8.2"
pyspark = {version="3.1.1", optional=true, extras=["sql"]}
findspark = "^1.4.2"
boto3 = "*"
pyarrow = "3.0.0"
[tool.poetry.dev-dependencies]
pytest = "6.1.1"
ipdb = "0.13.3"
pytest-cov = "2.10.1"
非常感谢可能遇到相同问题的任何人。
更新
根据 Alex 的评论,我通过以下方式解决了这个问题:
- Spark 版本
3.0.2
- Hadoop 版本
3.2.0
- 增量
0.8.0
spark-submit --packages "io.delta:delta-core_2.12:0.8.0,com.amazonaws:aws-java-sdk-pom:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" ~/code/dataops-delta-infrastructure/run.py
您需要将 Spark 降级到 Spark 3.0.2 才能使用 Delta 0.8.0 - 不幸的是,Spark 3.1.1 对在后台使用 Delta 的内部事物进行了许多更改,这破坏了二进制兼容性。您的具体问题很可能是由 SPARK-32154 that made changes in the parameters of the ScalaUDF
(this line)