Spark Streaming - 来自 Kafka 的推文流分类
Spark Streaming - Classification of tweets' stream from Kafka
我是 Spark 的新手,我绝对需要一些帮助来对来自 Kafka Stream 的推文进行分类。下面我将解释我到目前为止所做的步骤过程以及我卡住的地方。
我希望你们中的一些人能帮助我解决这个问题。
提前致谢。
上下文如下:
我有一个简单的 Kafka Producer 模拟推文的流(从文件读取)和一个 TweetAnalyzer Consumer 应该处理和一旦收到推文,就在 Spark Streaming Context 上对其进行分类。
为了对接收到的推文进行分类,我之前在磁盘上构建并存储了一个TF-IDF和Naive Bayes 在 Spark Streaming Context 启动之前加载的模型。
对于每条处理过的推文(词干提取、标点符号等),我应该计算其 TF-IDF 向量(特征向量)并分别利用 IDF 和朴素贝叶斯对其进行分类先前加载的模型。
开门见山,当我必须将推文的 项频率向量 (TF) 转换为其 TF-IDF 向量时,我的问题就出现了.
这是代码:
卡夫卡生产者
text_file = list(
csv.reader(
open('/twitterDataset/twitter/test_data.txt', 'rU')
)
)
for row in text_file:
time.sleep(1)
jd = json.dumps(row).encode('ascii')
producer.send(kafka_topic,jd)
TweetAnalyzer
#setting configuration
...
#reading configuration
...
#setting Kafka configuration
...
# Create Spark context
sc = SparkContext(
appName = app_name,
master = spark_master
)
# Create Streaming context
ssc = StreamingContext(
sc,
int(spark_batch_duration)
)
# Loading TF MODEL and compute TF-IDF
....
kafkaParams = {'metadata.broker.list"': kafka_brokers}
# Create direct kafka stream with brokers and topics
kvs = KafkaUtils.createDirectStream(
ssc,
[kafka_topic],
{"metadata.broker.list": kafka_brokers}
)
obj1 = TweetPreProcessing()
lines = kvs.map(lambda x: x[1])
tweet = lines.flatMap(obj1.TweetBuilder)
hashingTF = HashingTF()
#computing TF for each tweet
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.map(lambda x: IDF().fit(x))
.pprint()
ssc.start()
ssc.awaitTermination()
在最后几行代码中,我无法在 x 上应用 IDF().fit(x) 函数,因为 Spark 需要一个"RDD of term frequency vectors" 而在这一点上,由于 Streaming Spark 上下文,我有一个 "Trasformed DStream"。
我尝试使用 transform() 或 foreachRDD() 函数而不是 map( ),但我不知道如何在转换后 return 正确地创建一个新的 DStream。
例如:
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.transform(classify_tweet)
.pprint()
def classify_tweet(tf):
#compute TF-IDF of the tweet
idf = IDF().fit(tf)
tf_idf = idf.transform(tf)
#print(tf_idf.collect())
return idf
如果我 运行 使用转换函数的代码,Spark 触发(在回溯的顶部)这个错误:
File
"/workspace_spark/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 67, in call return r._jrdd
AttributeError: 'IDFModel' object
has no attribute '_jrdd'
但是如果我省略 return 语句并简单地打印 tf_idf 向量 它会给我正确的输出,如下所示:
[SparseVector(1048576, {164998: 0.0, 364601: 0.0, 924192: 0.0, 963449:
0.0})]
[SparseVector(1048576, {251465: 0.0, 821055: 0.0, 963449: 0.0})]
[SparseVector(1048576, {234762: 0.0, 280973: 0.0, 403903: 0.0, 712732: 0.0, 861562: 0.0, 1040690: 0.0})] ...
如果我做对了,我认为问题是我不能 return a SparseVector 当它期望 DStream.
无论如何,这个问题有解决方案吗?
如果有人能帮我解决这个问题,我将非常感激,我很悲惨地被困住了。
谢谢
Return 转换 tf_idf
:
>>> def classify_tweet(tf):
... return IDF().fit(tf).transform(tf)
我是 Spark 的新手,我绝对需要一些帮助来对来自 Kafka Stream 的推文进行分类。下面我将解释我到目前为止所做的步骤过程以及我卡住的地方。
我希望你们中的一些人能帮助我解决这个问题。
提前致谢。
上下文如下:
我有一个简单的 Kafka Producer 模拟推文的流(从文件读取)和一个 TweetAnalyzer Consumer 应该处理和一旦收到推文,就在 Spark Streaming Context 上对其进行分类。
为了对接收到的推文进行分类,我之前在磁盘上构建并存储了一个TF-IDF和Naive Bayes 在 Spark Streaming Context 启动之前加载的模型。
对于每条处理过的推文(词干提取、标点符号等),我应该计算其 TF-IDF 向量(特征向量)并分别利用 IDF 和朴素贝叶斯对其进行分类先前加载的模型。
开门见山,当我必须将推文的 项频率向量 (TF) 转换为其 TF-IDF 向量时,我的问题就出现了.
这是代码:
卡夫卡生产者
text_file = list(
csv.reader(
open('/twitterDataset/twitter/test_data.txt', 'rU')
)
)
for row in text_file:
time.sleep(1)
jd = json.dumps(row).encode('ascii')
producer.send(kafka_topic,jd)
TweetAnalyzer
#setting configuration
...
#reading configuration
...
#setting Kafka configuration
...
# Create Spark context
sc = SparkContext(
appName = app_name,
master = spark_master
)
# Create Streaming context
ssc = StreamingContext(
sc,
int(spark_batch_duration)
)
# Loading TF MODEL and compute TF-IDF
....
kafkaParams = {'metadata.broker.list"': kafka_brokers}
# Create direct kafka stream with brokers and topics
kvs = KafkaUtils.createDirectStream(
ssc,
[kafka_topic],
{"metadata.broker.list": kafka_brokers}
)
obj1 = TweetPreProcessing()
lines = kvs.map(lambda x: x[1])
tweet = lines.flatMap(obj1.TweetBuilder)
hashingTF = HashingTF()
#computing TF for each tweet
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.map(lambda x: IDF().fit(x))
.pprint()
ssc.start()
ssc.awaitTermination()
在最后几行代码中,我无法在 x 上应用 IDF().fit(x) 函数,因为 Spark 需要一个"RDD of term frequency vectors" 而在这一点上,由于 Streaming Spark 上下文,我有一个 "Trasformed DStream"。
我尝试使用 transform() 或 foreachRDD() 函数而不是 map( ),但我不知道如何在转换后 return 正确地创建一个新的 DStream。
例如:
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
.transform(classify_tweet)
.pprint()
def classify_tweet(tf):
#compute TF-IDF of the tweet
idf = IDF().fit(tf)
tf_idf = idf.transform(tf)
#print(tf_idf.collect())
return idf
如果我 运行 使用转换函数的代码,Spark 触发(在回溯的顶部)这个错误:
File "/workspace_spark/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 67, in call return r._jrdd
AttributeError: 'IDFModel' object has no attribute '_jrdd'
但是如果我省略 return 语句并简单地打印 tf_idf 向量 它会给我正确的输出,如下所示:
[SparseVector(1048576, {164998: 0.0, 364601: 0.0, 924192: 0.0, 963449: 0.0})]
[SparseVector(1048576, {251465: 0.0, 821055: 0.0, 963449: 0.0})]
[SparseVector(1048576, {234762: 0.0, 280973: 0.0, 403903: 0.0, 712732: 0.0, 861562: 0.0, 1040690: 0.0})] ...
如果我做对了,我认为问题是我不能 return a SparseVector 当它期望 DStream.
无论如何,这个问题有解决方案吗?
如果有人能帮我解决这个问题,我将非常感激,我很悲惨地被困住了。
谢谢
Return 转换 tf_idf
:
>>> def classify_tweet(tf):
... return IDF().fit(tf).transform(tf)