GCP dataproc - java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
GCP dataproc - java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
我正在尝试 运行 GCP DataProc 上的 StructuredStreaming 作业,它从 Kafka 读取并打印出值。代码出错 -> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
代码如下:
import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window
from confluent_kafka import Producer
from google.cloud import storage
spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
kafkaBrokers='34.75.148.41:9094'
topic = "versa-sase"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"
client = storage.Client()
print(" client ", client)
bucket = client.get_bucket('ssl-certs-karan')
print(" bucket ", bucket)
allblobs = bucket.list_blobs()
print(" allblobs -> ", allblobs)
for b in allblobs:
print(" b -> ", b)
blob_ssl_truststore_location = bucket.get_blob('ca.p12')
print(" blob_ssl_truststore_location.name ", blob_ssl_truststore_location.name)
blob_ssl_truststore_location.download_to_filename(blob_ssl_truststore_location.name)
ssl_truststore_location=blob_ssl_truststore_location.name
print(" type - blob_ssl_truststore_location ", type(blob_ssl_truststore_location))
ssl_truststore_password="NAvqbh5c9fB4"
blob_ssl_keystore_location = bucket.get_blob('dataproc-versa-sase.p12')
print(" blob_ssl_keystore_location.name ", blob_ssl_keystore_location.name)
blob_ssl_keystore_location.download_to_filename(blob_ssl_keystore_location.name)
ssl_keystore_location=blob_ssl_keystore_location.name
ssl_keystore_password="jBGsWrBv7258"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"
print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)
df = spark.read.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.load()
#
query = df.selectExpr("CAST(value AS STRING)") \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", checkpoint) \
.option("outputMode", "complete")\
.save("output")
# query.awaitTermination()
在 Dataproc 集群上启动作业的命令:
gcloud dataproc jobs submit pyspark \
StructuredStreaming_Kafka_GCP-Batch-feb1.py --cluster=dataproc-ss-poc
--jars=gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
--region=us-central1
错误:
SPARK.SPARKCONTEXT -> <SparkContext master=yarn appName=StructuredStreaming_VersaSase>
Traceback (most recent call last):
File "/tmp/b87ff69307344e2db5b43f4a73c377cf/StructuredStreaming_Kafka_GCP-Batch-feb1.py", line 49, in <module>
df = spark.read.format('kafka')\
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:556)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:336)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:127)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:307)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
我已经检查了 Dataproc 集群上的 Spark 版本,以及 spark 版本 - 3.1.2 和 scala 版本 - 2.12 .. 所以传递的 spark-sql jar 版本似乎是正确的。
还有其他罐子要传递吗?
fix/debug 这个问题需要做什么?
提亚!
请在此处查看官方部署指南:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying
提取重要部分:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 ...
总而言之,请使用“--packages”而不是“--jar”,因为它会处理传递依赖关系。
我通过如下传递包解决了这个问题
即 --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
请注意:我最初还添加了单独的罐子来解决问题,但这显然不是正确的方法
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1
缺少的 class org/apache/kafka/common/serialization/ByteArraySerializer
在 kafka-clients
包 1 which is a dependency of the spark-sql-kafka-0-10_2.12
package 2 中。
您可以使用 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1
自动拉取传递依赖项,或使用 --jars=gs://my-bucket/spark-sql-kafka-0-10_2.12-3.1.2.jar,gs://my-bucket/kafka-clients-0.10.2.2.jar
添加所有依赖项。
我正在尝试 运行 GCP DataProc 上的 StructuredStreaming 作业,它从 Kafka 读取并打印出值。代码出错 -> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
代码如下:
import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window
from confluent_kafka import Producer
from google.cloud import storage
spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
kafkaBrokers='34.75.148.41:9094'
topic = "versa-sase"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"
client = storage.Client()
print(" client ", client)
bucket = client.get_bucket('ssl-certs-karan')
print(" bucket ", bucket)
allblobs = bucket.list_blobs()
print(" allblobs -> ", allblobs)
for b in allblobs:
print(" b -> ", b)
blob_ssl_truststore_location = bucket.get_blob('ca.p12')
print(" blob_ssl_truststore_location.name ", blob_ssl_truststore_location.name)
blob_ssl_truststore_location.download_to_filename(blob_ssl_truststore_location.name)
ssl_truststore_location=blob_ssl_truststore_location.name
print(" type - blob_ssl_truststore_location ", type(blob_ssl_truststore_location))
ssl_truststore_password="NAvqbh5c9fB4"
blob_ssl_keystore_location = bucket.get_blob('dataproc-versa-sase.p12')
print(" blob_ssl_keystore_location.name ", blob_ssl_keystore_location.name)
blob_ssl_keystore_location.download_to_filename(blob_ssl_keystore_location.name)
ssl_keystore_location=blob_ssl_keystore_location.name
ssl_keystore_password="jBGsWrBv7258"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"
print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)
df = spark.read.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.load()
#
query = df.selectExpr("CAST(value AS STRING)") \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", checkpoint) \
.option("outputMode", "complete")\
.save("output")
# query.awaitTermination()
在 Dataproc 集群上启动作业的命令:
gcloud dataproc jobs submit pyspark \
StructuredStreaming_Kafka_GCP-Batch-feb1.py --cluster=dataproc-ss-poc
--jars=gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
--region=us-central1
错误:
SPARK.SPARKCONTEXT -> <SparkContext master=yarn appName=StructuredStreaming_VersaSase>
Traceback (most recent call last):
File "/tmp/b87ff69307344e2db5b43f4a73c377cf/StructuredStreaming_Kafka_GCP-Batch-feb1.py", line 49, in <module>
df = spark.read.format('kafka')\
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:556)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:336)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:127)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:307)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
我已经检查了 Dataproc 集群上的 Spark 版本,以及 spark 版本 - 3.1.2 和 scala 版本 - 2.12 .. 所以传递的 spark-sql jar 版本似乎是正确的。 还有其他罐子要传递吗?
fix/debug 这个问题需要做什么?
提亚!
请在此处查看官方部署指南:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying
提取重要部分:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 ...
总而言之,请使用“--packages”而不是“--jar”,因为它会处理传递依赖关系。
我通过如下传递包解决了这个问题 即 --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 请注意:我最初还添加了单独的罐子来解决问题,但这显然不是正确的方法
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1
缺少的 class org/apache/kafka/common/serialization/ByteArraySerializer
在 kafka-clients
包 1 which is a dependency of the spark-sql-kafka-0-10_2.12
package 2 中。
您可以使用 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1
自动拉取传递依赖项,或使用 --jars=gs://my-bucket/spark-sql-kafka-0-10_2.12-3.1.2.jar,gs://my-bucket/kafka-clients-0.10.2.2.jar
添加所有依赖项。