尝试在 spark 数据帧上使用 johnsnow 预训练管道,但无法在同一会话中读取增量文件
trying to use johnsnow pretrained pipeline on spark dataframe but unable to read delta file in the same session
我正在使用以下代码从 hdfs 读取 spark 数据帧:
from delta import *
from pyspark.sql import SparkSession
builder= SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark=configure_spark_with_delta_pip(builder).getOrCreate()
#change file path here
delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')
delta_df.show(10, truncate=False)
及以下使用预训练管道的代码:
from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp
# spark session one way
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[4]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()
# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)
# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")
print("-------")
# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')
# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)
#printing the result
print(result)
print("DONE!!!")
我希望合并这两个代码,但是两个 spark 会话没有合并或者不能同时处理这两个任务。请帮助!
我尝试合并两个 spark 会话的 .config() 选项,但没有成功
我还尝试创建两个 spark 会话,但没有成功
普通的 spark 会话足以读取其他格式文件,但要读取增量文件我必须严格使用此选项:configure_spark_with_delta_pip(builder)
有什么办法可以绕过这个吗?或使代码 运行?
configure_spark_with_delta_pip
只是设置正确的 SparkSession 参数的快捷方式...如果你查看 its source code 你会看到下面的代码,你会发现它所做的一切都是配置 spark.jars.packages
。但是因为您将它单独用于 SparkNLP,所以您将覆盖 Delta 的值。
更新 14.04.2022:回答时未发布,但在版本 1.2.0 中可用
为了处理这种情况,configure_spark_with_delta_pip
有一个额外的参数 extra_packages
来指定要配置的额外包。因此,在您的情况下,代码应如下所示:
builder = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")
my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]
spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
.getOrCreate()
在发布带有额外参数的实现之前,您需要避免使用该功能,并自行配置所有参数,如下所示:
scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2",
f"io.delta:delta-core_{scala_version}:{delta_version}"]
spark = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M") \
.config("spark.jars.packages", ",".join(all_packages)) \
.getOrCreate()
我正在使用以下代码从 hdfs 读取 spark 数据帧:
from delta import *
from pyspark.sql import SparkSession
builder= SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark=configure_spark_with_delta_pip(builder).getOrCreate()
#change file path here
delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')
delta_df.show(10, truncate=False)
及以下使用预训练管道的代码:
from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp
# spark session one way
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[4]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()
# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)
# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")
print("-------")
# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')
# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)
#printing the result
print(result)
print("DONE!!!")
我希望合并这两个代码,但是两个 spark 会话没有合并或者不能同时处理这两个任务。请帮助!
我尝试合并两个 spark 会话的 .config() 选项,但没有成功 我还尝试创建两个 spark 会话,但没有成功
普通的 spark 会话足以读取其他格式文件,但要读取增量文件我必须严格使用此选项:configure_spark_with_delta_pip(builder)
有什么办法可以绕过这个吗?或使代码 运行?
configure_spark_with_delta_pip
只是设置正确的 SparkSession 参数的快捷方式...如果你查看 its source code 你会看到下面的代码,你会发现它所做的一切都是配置 spark.jars.packages
。但是因为您将它单独用于 SparkNLP,所以您将覆盖 Delta 的值。
更新 14.04.2022:回答时未发布,但在版本 1.2.0 中可用
为了处理这种情况,configure_spark_with_delta_pip
有一个额外的参数 extra_packages
来指定要配置的额外包。因此,在您的情况下,代码应如下所示:
builder = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")
my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]
spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
.getOrCreate()
在发布带有额外参数的实现之前,您需要避免使用该功能,并自行配置所有参数,如下所示:
scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2",
f"io.delta:delta-core_{scala_version}:{delta_version}"]
spark = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M") \
.config("spark.jars.packages", ",".join(all_packages)) \
.getOrCreate()