是否可以将库 Spark-NLP 与 Spark Structured Streaming 一起使用?
Is it possible to use the library Spark-NLP with Spark Structured Streaming?
我想对从 Kafka 集群获取的消息流执行推文情绪分析,而 Kafka 集群又从 Twitter API v2.
获取推文
当我尝试应用预训练的情绪分析管道时,我收到一条错误消息:Exception: target must be either a spark DataFrame, a list of strings or a string
,我想知道是否有解决此问题的方法。
我查看了文档,但找不到有关流数据的任何信息。
这是我正在使用的代码:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, from_json, from_unixtime, unix_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType, MapType, ArrayType
from sparknlp.pretrained import PretrainedPipeline
spark = SparkSession.builder.appName('twitter_app')\
.master("local[*]")\
.config('spark.jars.packages',
'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2')\
.config('spark.streaming.stopGracefullyOnShutdown', 'true')\
.config("spark.driver.memory","8G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.getOrCreate()
schema = StructType() \
.add("data", StructType() \
.add("created_at", TimestampType())
.add("id", StringType()) \
.add("text", StringType())) \
.add("matching_rules", ArrayType(StructType() \
.add('id', StringType()) \
.add('tag', StringType())))
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094") \
.option("subscribe", "Zelensky,Putin,Biden,NATO,NoFlyZone") \
.option("startingOffsets", "latest") \
.load() \
.select((from_json(col("value").cast("string"), schema)).alias('text'),
col('topic'), col('key').cast('string'))
nlp_pipeline = PretrainedPipeline("analyze_sentimentdl_use_twitter", lang='en')
df = kafka_df.select('key',
col('text.data.created_at').alias('created_at'),
col('text.data.text').alias('text'),
'topic') \
.withColumn('sentiment', nlp_pipeline.annotate(col('text.data.text')))
然后我得到了我之前提到的错误:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
Input In [11], in <cell line: 1>()
1 df = kafka_df.select('key',
2 col('text.data.created_at').alias('created_at'),
3 col('text.data.text').alias('text'),
4 'topic') \
----> 5 .withColumn('sentiment', nlp_pipeline.annotate(col('text.data.text')))
File ~/.local/share/virtualenvs/spark_home_lab-iuwyZNhT/lib/python3.9/site-packages/sparknlp/pretrained.py:183, in PretrainedPipeline.annotate(self, target, column)
181 return pipeline.annotate(target)
182 else:
--> 183 raise Exception("target must be either a spark DataFrame, a list of strings or a string")
Exception: target must be either a spark DataFrame, a list of strings or a string
也许无法使用 Spark-NLP 处理流数据?
您可以通过以下方式尝试nlp_pipeline.transform(kafka_df)
:
text_df = kafka_df.select('key',
col('text.data.created_at').alias('created_at'),
col('text.data.text').alias('text'),
'topic')
df = (nlp_pipeline
.transform(text_df)
.select('key', 'created_at', 'text', 'topic', 'sentiment.result')
)
df
将是您正在寻找的结构化流。
因为 Spark-NLP 基于 Spark ML,您可以将结构化流 kafka_df
视为 DataFrame。 nlp_pipeline
是一个 pyspark.ml.Pipeline
。使用 Pipeline
进行预测的一种有效方法是调用 .transform(kafka_df)
.
以下是 Spark NLP 创建者如何构建您使用的管道的示例
https://nlp.johnsnowlabs.com/2021/01/18/sentimentdl_use_twitter_en.html
我想对从 Kafka 集群获取的消息流执行推文情绪分析,而 Kafka 集群又从 Twitter API v2.
获取推文当我尝试应用预训练的情绪分析管道时,我收到一条错误消息:Exception: target must be either a spark DataFrame, a list of strings or a string
,我想知道是否有解决此问题的方法。
我查看了文档,但找不到有关流数据的任何信息。
这是我正在使用的代码:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, from_json, from_unixtime, unix_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType, MapType, ArrayType
from sparknlp.pretrained import PretrainedPipeline
spark = SparkSession.builder.appName('twitter_app')\
.master("local[*]")\
.config('spark.jars.packages',
'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2')\
.config('spark.streaming.stopGracefullyOnShutdown', 'true')\
.config("spark.driver.memory","8G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.getOrCreate()
schema = StructType() \
.add("data", StructType() \
.add("created_at", TimestampType())
.add("id", StringType()) \
.add("text", StringType())) \
.add("matching_rules", ArrayType(StructType() \
.add('id', StringType()) \
.add('tag', StringType())))
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094") \
.option("subscribe", "Zelensky,Putin,Biden,NATO,NoFlyZone") \
.option("startingOffsets", "latest") \
.load() \
.select((from_json(col("value").cast("string"), schema)).alias('text'),
col('topic'), col('key').cast('string'))
nlp_pipeline = PretrainedPipeline("analyze_sentimentdl_use_twitter", lang='en')
df = kafka_df.select('key',
col('text.data.created_at').alias('created_at'),
col('text.data.text').alias('text'),
'topic') \
.withColumn('sentiment', nlp_pipeline.annotate(col('text.data.text')))
然后我得到了我之前提到的错误:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
Input In [11], in <cell line: 1>()
1 df = kafka_df.select('key',
2 col('text.data.created_at').alias('created_at'),
3 col('text.data.text').alias('text'),
4 'topic') \
----> 5 .withColumn('sentiment', nlp_pipeline.annotate(col('text.data.text')))
File ~/.local/share/virtualenvs/spark_home_lab-iuwyZNhT/lib/python3.9/site-packages/sparknlp/pretrained.py:183, in PretrainedPipeline.annotate(self, target, column)
181 return pipeline.annotate(target)
182 else:
--> 183 raise Exception("target must be either a spark DataFrame, a list of strings or a string")
Exception: target must be either a spark DataFrame, a list of strings or a string
也许无法使用 Spark-NLP 处理流数据?
您可以通过以下方式尝试nlp_pipeline.transform(kafka_df)
:
text_df = kafka_df.select('key',
col('text.data.created_at').alias('created_at'),
col('text.data.text').alias('text'),
'topic')
df = (nlp_pipeline
.transform(text_df)
.select('key', 'created_at', 'text', 'topic', 'sentiment.result')
)
df
将是您正在寻找的结构化流。
因为 Spark-NLP 基于 Spark ML,您可以将结构化流 kafka_df
视为 DataFrame。 nlp_pipeline
是一个 pyspark.ml.Pipeline
。使用 Pipeline
进行预测的一种有效方法是调用 .transform(kafka_df)
.
以下是 Spark NLP 创建者如何构建您使用的管道的示例 https://nlp.johnsnowlabs.com/2021/01/18/sentimentdl_use_twitter_en.html