尝试在 spark 数据帧上使用 johnsnow 预训练管道,但无法在同一会话中读取增量文件

trying to use johnsnow pretrained pipeline on spark dataframe but unable to read delta file in the same session

我正在使用以下代码从 hdfs 读取 spark 数据帧:

from delta import *
from pyspark.sql import SparkSession



builder= SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark=configure_spark_with_delta_pip(builder).getOrCreate()


#change file path here

delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')


delta_df.show(10, truncate=False)

及以下使用预训练管道的代码:

from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp

# spark session one way
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()


# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)


# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")


print("-------")

# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')

# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)

#printing the result
print(result)

print("DONE!!!")

我希望合并这两个代码,但是两个 spark 会话没有合并或者不能同时处理这两个任务。请帮助!

我尝试合并两个 spark 会话的 .config() 选项,但没有成功 我还尝试创建两个 spark 会话,但没有成功

普通的 spark 会话足以读取其他格式文件,但要读取增量文件我必须严格使用此选项:configure_spark_with_delta_pip(builder)

有什么办法可以绕过这个吗?或使代码 运行?

configure_spark_with_delta_pip 只是设置正确的 SparkSession 参数的快捷方式...如果你查看 its source code 你会看到下面的代码,你会发现它所做的一切都是配置 spark.jars.packages。但是因为您将它单独用于 SparkNLP,所以您将覆盖 Delta 的值。

更新 14.04.2022:回答时未发布,但在版本 1.2.0 中可用

为了处理这种情况,configure_spark_with_delta_pip 有一个额外的参数 extra_packages 来指定要配置的额外包。因此,在您的情况下,代码应如下所示:

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")

my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]

spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
  .getOrCreate()

在发布带有额外参数的实现之前,您需要避免使用该功能,并自行配置所有参数,如下所示:

scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2", 
   f"io.delta:delta-core_{scala_version}:{delta_version}"]

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", ",".join(all_packages)) \
    .getOrCreate()