Spark/Kafka 将 Spark 2.4 更新到 Spark 3.1 后出错

Spark/Kafka error after updating Spark 2.4 to Spark 3.1

我尝试使用 Debian Linux 和 Kafka 从 Spark 2.4.1 升级到 Spark 3.1.2。因此,我也将 Kafka 从 2.4.1 更新到 2.8,但这似乎不是问题所在。我检查了 https://spark.apache.org/docs/latest/ 的依赖项,到目前为止似乎还不错。

对于 Spark 2.4.1,我在 sparks 目录中使用了这些额外的 jar:

对于 Spark 3.1.2,我更新了这些 jar 并且已经添加了一些其他文件已经存在,比如未使用:

我将我的 pyspark 代码条带化为适用于 spark 2.4.1 但不适用于 Spark 3.1.2 的代码:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.utils import AnalysisException
import datetime

# configuration of target db
db_target_url = "jdbc:mysql://localhost/test"
db_target_properties = {"user": "john", "password": "doe"}

# create spark session
spark = SparkSession.builder.appName("live1").getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)

# create schema for the json iba data
schema_tww_vs = T.StructType([T.StructField("[22:8]", T.DoubleType()),\
                           T.StructField("[1:3]", T.DoubleType()),\
                           T.StructField("Timestamp", T.StringType())])

# create dataframe representing the stream and take the json data into a usable df structure
d = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test_so") \
  .load() \
  .selectExpr("timestamp", "cast (value as string) as json") \
  .select("timestamp", F.from_json("json", schema_tww_vs).alias("struct")) \
  .selectExpr("timestamp", "struct.*") \

# add timestamp of this spark processing
d = d.withColumn("time_spark", F.current_timestamp())
d1 = d.withColumnRenamed('[1:3]','signal1') \
  .withColumnRenamed('[22:8]','ident_orig') \
  .withColumnRenamed('timestamp','time_kafka') \
  .withColumnRenamed('Timestamp','time_source')
d1 = d1.withColumn("ident", F.round(d1["ident_orig"]).cast('integer'))

d4 = d1.where("signal1 > 3000")

d4a = d4.withWatermark("time_kafka", "1 second") \
    .groupby('ident', F.window('time_kafka', "5 second")) \
    .agg(
         F.count("*").alias("count"), \
         F.min("time_kafka").alias("time_start"), \
         F.round(F.avg("signal1"),1).alias('signal1_avg'),)

# Remove the column "windows" since this struct (with start and stop time) cannot be written to the db
d4a = d4a.drop('window')

d8a = d4a.select('time_start', 'ident', 'count', 'signal1_avg')

# write the dataframe into the database using the streaming mode
def write_into_sink(df, epoch_id):
    df.write.jdbc(table="test_so", mode="append", url=db_target_url, properties=db_target_properties)
    pass
query_write_sink = d8a.writeStream \
     .foreachBatch(write_into_sink) \
     .trigger(processingTime = "1 seconds") \
     .start()

一些错误是:

java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
Jul 22 15:41:22 run [847]: #011at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:623)
Jul 22 15:41:22 run [847]: #011at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
…
jupyterhub-start.sh[847]: 21/07/22 15:41:22 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
Jul 22 15:41:22 run [847]: 21/07/22 15:41:22 ERROR MicroBatchExecution: Query [id = 5d2a70aa-1463-48f3-a4a6-995ceef22891, runId = d1f856b5-eb0c-4635-b78a-d55e7ce81f2b] terminated with error
Jul 22 15:41:22 run [847]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
Jul 22 15:41:22 run [847]:   File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/java_gateway.py", line 2451, in _call_proxy
Jul 22 15:41:22 run [847]:     return_value = getattr(self.pool[obj_id], method)(*params)
Jul 22 15:41:22 run [847]:   File "/opt/spark/python/pyspark/sql/utils.py", line 196, in call
Jul 22 15:41:22 run [847]:     raise e
Jul 22 15:41:22 run [847]:   File "/opt/spark/python/pyspark/sql/utils.py", line 193, in call
Jul 22 15:41:22 run [847]:     self.func(DataFrame(jdf, self.sql_ctx), batch_id)
Jul 22 15:41:22 run [847]:   File "<ipython-input-10-d40564c31f71>", line 3, in write_into_sink
Jul 22 15:41:22 run [847]:     df.write.jdbc(table="test_so", mode="append", url=db_target_url, properties=db_target_properties)
Jul 22 15:41:22 run [847]:   File "/opt/spark/python/pyspark/sql/readwriter.py", line 1445, in jdbc
Jul 22 15:41:22 run [847]:     self.mode(mode)._jwrite.jdbc(url, table, jprop)
Jul 22 15:41:22 run [847]:   File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/java_gateway.py", line 1310, in __call__
Jul 22 15:41:22 run [847]:     answer, self.gateway_client, self.target_id, self.name)
Jul 22 15:41:22 run [847]:   File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco
Jul 22 15:41:22 run [847]:     return f(*a, **kw)
Jul 22 15:41:22 run [847]:   File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
Jul 22 15:41:22 run [847]:     format(target_id, ".", name), value)
Jul 22 15:41:22 run [847]: py4j.protocol.Py4JJavaError: An error occurred while calling o101.jdbc.
Jul 22 15:41:22 run [847]: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 200) (master executor driver): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig

您知道导致此错误的原因吗?

正如 devesh 所说,缺少一个 jar 文件: