PySpark Kafka - NoClassDefFound:org/apache/commons/pool2

PySpark Kafka - NoClassDefFound: org/apache/commons/pool2

我在从 kafka 主题打印数据到控制台时遇到问题。 我收到的错误消息如下图所示。

正如您在上图中看到的,在批次 0 之后,它没有进一步处理。

所有这些都是错误消息的快照。我不明白发生错误的根本原因。请帮助我。

kafka和spark版本如下:

spark version: spark-3.1.1-bin-hadoop2.7
kafka version: kafka_2.13-2.7.0

我正在使用以下罐子:

kafka-clients-2.7.0.jar 
spark-sql-kafka-0-10_2.12-3.1.1.jar 
spark-token-provider-kafka-0-10_2.12-3.1.1.jar 

这是我的代码:

spark = SparkSession \
        .builder \
        .appName("Pyspark structured streaming with kafka and cassandra") \
        .master("local[*]") \
        .config("spark.jars","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.executor.extraClassPath","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.executor.extraLibrary","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.driver.extraClassPath","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")


#streaming dataframe that reads from kafka topic
    df_kafka=spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",kafka_bootstrap_servers)\
    .option("subscribe",kafka_topic_name)\
    .option("startingOffsets", "latest") \
    .load()

    print("Printing schema of df_kafka:")
    df_kafka.printSchema()

    #converting data from kafka broker to string type
    df_kafka_string=df_kafka.selectExpr("CAST(value AS STRING) as value")

    # schema to read json format data
    ts_schema = StructType() \
        .add("id_str", StringType()) \
        .add("created_at", StringType()) \
        .add("text", StringType())

    #parse json data
    df_kafka_string_parsed=df_kafka_string.select(from_json(col("value"),ts_schema).alias("twts"))

    df_kafka_string_parsed_format=df_kafka_string_parsed.select("twts.*")
    df_kafka_string_parsed_format.printSchema()


    df=df_kafka_string_parsed_format.writeStream \
    .trigger(processingTime="1 seconds") \
    .outputMode("update")\
    .option("truncate","false")\
    .format("console")\
    .start()

    df.awaitTermination()

错误(NoClassDefFound,后跟 kafka010 包)是说 spark-sql-kafka-0-10 缺少对 org.apache.commons:commons-pool2:2.6.2 的传递依赖,你可以 see here

您也可以下载该 JAR,或者您可以更改您的代码以使用 --packages 而不是 spark.jars 选项,并让 Ivy 处理下载传递依赖项

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache...'

spark = SparkSession.bulider...