Spark/Kafka 将 Spark 2.4 更新到 Spark 3.1 后出错
Spark/Kafka error after updating Spark 2.4 to Spark 3.1
我尝试使用 Debian Linux 和 Kafka 从 Spark 2.4.1 升级到 Spark 3.1.2。因此,我也将 Kafka 从 2.4.1 更新到 2.8,但这似乎不是问题所在。我检查了 https://spark.apache.org/docs/latest/ 的依赖项,到目前为止似乎还不错。
对于 Spark 2.4.1,我在 sparks 目录中使用了这些额外的 jar:
- slf4j-api-1.7.26.jar
- unused-1.0.0.jar
- lz4-java-1.6.0.jar
- kafka-clients-2.3.0.jar
- spark-streaming-kafka-0-10_2.11-2.4.3.jar
- spark-sql-kafka-0-10_2.11-2.4.3.jar
对于 Spark 3.1.2,我更新了这些 jar 并且已经添加了一些其他文件已经存在,比如未使用:
- spark-sql-kafka-0-10_2.12-3.1.2.jar
- spark-streaming-kafka-0-10_2.12-3.1.2.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
- spark-token-provider-kafka-0-10_2.12-3.1.2.jar
- kafka-clients-2.8.0.jar
我将我的 pyspark 代码条带化为适用于 spark 2.4.1 但不适用于 Spark 3.1.2 的代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.utils import AnalysisException
import datetime
# configuration of target db
db_target_url = "jdbc:mysql://localhost/test"
db_target_properties = {"user": "john", "password": "doe"}
# create spark session
spark = SparkSession.builder.appName("live1").getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)
# create schema for the json iba data
schema_tww_vs = T.StructType([T.StructField("[22:8]", T.DoubleType()),\
T.StructField("[1:3]", T.DoubleType()),\
T.StructField("Timestamp", T.StringType())])
# create dataframe representing the stream and take the json data into a usable df structure
d = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test_so") \
.load() \
.selectExpr("timestamp", "cast (value as string) as json") \
.select("timestamp", F.from_json("json", schema_tww_vs).alias("struct")) \
.selectExpr("timestamp", "struct.*") \
# add timestamp of this spark processing
d = d.withColumn("time_spark", F.current_timestamp())
d1 = d.withColumnRenamed('[1:3]','signal1') \
.withColumnRenamed('[22:8]','ident_orig') \
.withColumnRenamed('timestamp','time_kafka') \
.withColumnRenamed('Timestamp','time_source')
d1 = d1.withColumn("ident", F.round(d1["ident_orig"]).cast('integer'))
d4 = d1.where("signal1 > 3000")
d4a = d4.withWatermark("time_kafka", "1 second") \
.groupby('ident', F.window('time_kafka', "5 second")) \
.agg(
F.count("*").alias("count"), \
F.min("time_kafka").alias("time_start"), \
F.round(F.avg("signal1"),1).alias('signal1_avg'),)
# Remove the column "windows" since this struct (with start and stop time) cannot be written to the db
d4a = d4a.drop('window')
d8a = d4a.select('time_start', 'ident', 'count', 'signal1_avg')
# write the dataframe into the database using the streaming mode
def write_into_sink(df, epoch_id):
df.write.jdbc(table="test_so", mode="append", url=db_target_url, properties=db_target_properties)
pass
query_write_sink = d8a.writeStream \
.foreachBatch(write_into_sink) \
.trigger(processingTime = "1 seconds") \
.start()
一些错误是:
java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
Jul 22 15:41:22 run [847]: #011at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:623)
Jul 22 15:41:22 run [847]: #011at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
…
jupyterhub-start.sh[847]: 21/07/22 15:41:22 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
Jul 22 15:41:22 run [847]: 21/07/22 15:41:22 ERROR MicroBatchExecution: Query [id = 5d2a70aa-1463-48f3-a4a6-995ceef22891, runId = d1f856b5-eb0c-4635-b78a-d55e7ce81f2b] terminated with error
Jul 22 15:41:22 run [847]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
Jul 22 15:41:22 run [847]: File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/java_gateway.py", line 2451, in _call_proxy
Jul 22 15:41:22 run [847]: return_value = getattr(self.pool[obj_id], method)(*params)
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/utils.py", line 196, in call
Jul 22 15:41:22 run [847]: raise e
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/utils.py", line 193, in call
Jul 22 15:41:22 run [847]: self.func(DataFrame(jdf, self.sql_ctx), batch_id)
Jul 22 15:41:22 run [847]: File "<ipython-input-10-d40564c31f71>", line 3, in write_into_sink
Jul 22 15:41:22 run [847]: df.write.jdbc(table="test_so", mode="append", url=db_target_url, properties=db_target_properties)
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/readwriter.py", line 1445, in jdbc
Jul 22 15:41:22 run [847]: self.mode(mode)._jwrite.jdbc(url, table, jprop)
Jul 22 15:41:22 run [847]: File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/java_gateway.py", line 1310, in __call__
Jul 22 15:41:22 run [847]: answer, self.gateway_client, self.target_id, self.name)
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco
Jul 22 15:41:22 run [847]: return f(*a, **kw)
Jul 22 15:41:22 run [847]: File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
Jul 22 15:41:22 run [847]: format(target_id, ".", name), value)
Jul 22 15:41:22 run [847]: py4j.protocol.Py4JJavaError: An error occurred while calling o101.jdbc.
Jul 22 15:41:22 run [847]: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 200) (master executor driver): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
您知道导致此错误的原因吗?
正如 devesh 所说,缺少一个 jar 文件:
- commons-pool2-2.8.0.jar 可以从 https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.8.0
下载
我尝试使用 Debian Linux 和 Kafka 从 Spark 2.4.1 升级到 Spark 3.1.2。因此,我也将 Kafka 从 2.4.1 更新到 2.8,但这似乎不是问题所在。我检查了 https://spark.apache.org/docs/latest/ 的依赖项,到目前为止似乎还不错。
对于 Spark 2.4.1,我在 sparks 目录中使用了这些额外的 jar:
- slf4j-api-1.7.26.jar
- unused-1.0.0.jar
- lz4-java-1.6.0.jar
- kafka-clients-2.3.0.jar
- spark-streaming-kafka-0-10_2.11-2.4.3.jar
- spark-sql-kafka-0-10_2.11-2.4.3.jar
对于 Spark 3.1.2,我更新了这些 jar 并且已经添加了一些其他文件已经存在,比如未使用:
- spark-sql-kafka-0-10_2.12-3.1.2.jar
- spark-streaming-kafka-0-10_2.12-3.1.2.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
- spark-token-provider-kafka-0-10_2.12-3.1.2.jar
- kafka-clients-2.8.0.jar
我将我的 pyspark 代码条带化为适用于 spark 2.4.1 但不适用于 Spark 3.1.2 的代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.utils import AnalysisException
import datetime
# configuration of target db
db_target_url = "jdbc:mysql://localhost/test"
db_target_properties = {"user": "john", "password": "doe"}
# create spark session
spark = SparkSession.builder.appName("live1").getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)
# create schema for the json iba data
schema_tww_vs = T.StructType([T.StructField("[22:8]", T.DoubleType()),\
T.StructField("[1:3]", T.DoubleType()),\
T.StructField("Timestamp", T.StringType())])
# create dataframe representing the stream and take the json data into a usable df structure
d = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test_so") \
.load() \
.selectExpr("timestamp", "cast (value as string) as json") \
.select("timestamp", F.from_json("json", schema_tww_vs).alias("struct")) \
.selectExpr("timestamp", "struct.*") \
# add timestamp of this spark processing
d = d.withColumn("time_spark", F.current_timestamp())
d1 = d.withColumnRenamed('[1:3]','signal1') \
.withColumnRenamed('[22:8]','ident_orig') \
.withColumnRenamed('timestamp','time_kafka') \
.withColumnRenamed('Timestamp','time_source')
d1 = d1.withColumn("ident", F.round(d1["ident_orig"]).cast('integer'))
d4 = d1.where("signal1 > 3000")
d4a = d4.withWatermark("time_kafka", "1 second") \
.groupby('ident', F.window('time_kafka', "5 second")) \
.agg(
F.count("*").alias("count"), \
F.min("time_kafka").alias("time_start"), \
F.round(F.avg("signal1"),1).alias('signal1_avg'),)
# Remove the column "windows" since this struct (with start and stop time) cannot be written to the db
d4a = d4a.drop('window')
d8a = d4a.select('time_start', 'ident', 'count', 'signal1_avg')
# write the dataframe into the database using the streaming mode
def write_into_sink(df, epoch_id):
df.write.jdbc(table="test_so", mode="append", url=db_target_url, properties=db_target_properties)
pass
query_write_sink = d8a.writeStream \
.foreachBatch(write_into_sink) \
.trigger(processingTime = "1 seconds") \
.start()
一些错误是:
java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
Jul 22 15:41:22 run [847]: #011at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:623)
Jul 22 15:41:22 run [847]: #011at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
…
jupyterhub-start.sh[847]: 21/07/22 15:41:22 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
Jul 22 15:41:22 run [847]: 21/07/22 15:41:22 ERROR MicroBatchExecution: Query [id = 5d2a70aa-1463-48f3-a4a6-995ceef22891, runId = d1f856b5-eb0c-4635-b78a-d55e7ce81f2b] terminated with error
Jul 22 15:41:22 run [847]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
Jul 22 15:41:22 run [847]: File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/java_gateway.py", line 2451, in _call_proxy
Jul 22 15:41:22 run [847]: return_value = getattr(self.pool[obj_id], method)(*params)
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/utils.py", line 196, in call
Jul 22 15:41:22 run [847]: raise e
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/utils.py", line 193, in call
Jul 22 15:41:22 run [847]: self.func(DataFrame(jdf, self.sql_ctx), batch_id)
Jul 22 15:41:22 run [847]: File "<ipython-input-10-d40564c31f71>", line 3, in write_into_sink
Jul 22 15:41:22 run [847]: df.write.jdbc(table="test_so", mode="append", url=db_target_url, properties=db_target_properties)
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/readwriter.py", line 1445, in jdbc
Jul 22 15:41:22 run [847]: self.mode(mode)._jwrite.jdbc(url, table, jprop)
Jul 22 15:41:22 run [847]: File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/java_gateway.py", line 1310, in __call__
Jul 22 15:41:22 run [847]: answer, self.gateway_client, self.target_id, self.name)
Jul 22 15:41:22 run [847]: File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco
Jul 22 15:41:22 run [847]: return f(*a, **kw)
Jul 22 15:41:22 run [847]: File "/opt/anaconda/envs/env1/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
Jul 22 15:41:22 run [847]: format(target_id, ".", name), value)
Jul 22 15:41:22 run [847]: py4j.protocol.Py4JJavaError: An error occurred while calling o101.jdbc.
Jul 22 15:41:22 run [847]: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 200) (master executor driver): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
您知道导致此错误的原因吗?
正如 devesh 所说,缺少一个 jar 文件:
- commons-pool2-2.8.0.jar 可以从 https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.8.0 下载