Spark Streaming - 来自 Kafka 的推文流分类

Spark Streaming - Classification of tweets' stream from Kafka

我是 Spark 的新手,我绝对需要一些帮助来对来自 Kafka Stream 的推文进行分类。下面我将解释我到目前为止所做的步骤过程以及我卡住的地方。

我希望你们中的一些人能帮助我解决这个问题。

提前致谢。

上下文如下:

我有一个简单的 Kafka Producer 模拟推文的流(从文件读取)和一个 TweetAnalyzer Consumer 应该处理和一旦收到推文,就在 Spark Streaming Context 上对其进行分类。

为了对接收到的推文进行分类,我之前在磁盘上构建并存储了一个TF-IDFNaive BayesSpark Streaming Context 启动之前加载的模型。

对于每条处理过的推文(词干提取、标点符号等),我应该计算其 TF-IDF 向量(特征向量)并分别利用 IDF 和朴素贝叶斯对其进行分类先前加载的模型。

开门见山,当我必须将推文的 项频率向量 (TF) 转换为其 TF-IDF 向量时,我的问题就出现了.

这是代码:

卡夫卡生产者

text_file = list(
    csv.reader(
        open('/twitterDataset/twitter/test_data.txt', 'rU')
    )
)

for row in text_file:
    time.sleep(1)
    jd = json.dumps(row).encode('ascii')
    producer.send(kafka_topic,jd)

TweetAnalyzer

#setting configuration
...  
#reading configuration
...
#setting Kafka configuration
...

# Create Spark context
sc = SparkContext(
    appName = app_name,
    master  = spark_master
)

# Create Streaming context
ssc = StreamingContext(
    sc,
    int(spark_batch_duration)
)

# Loading TF MODEL and compute TF-IDF
....

kafkaParams = {'metadata.broker.list"': kafka_brokers}

# Create direct kafka stream with brokers and topics
kvs = KafkaUtils.createDirectStream(
    ssc,
    [kafka_topic],
    {"metadata.broker.list": kafka_brokers}
)

obj1 = TweetPreProcessing()

lines = kvs.map(lambda x: x[1])

tweet = lines.flatMap(obj1.TweetBuilder)

hashingTF = HashingTF()

#computing TF for each tweet
tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
                .map(lambda x: IDF().fit(x))
                .pprint()

ssc.start()
ssc.awaitTermination()

在最后几行代码中,我无法在 x 上应用 IDF().fit(x) 函数,因为 Spark 需要一个"RDD of term frequency vectors" 而在这一点上,由于 Streaming Spark 上下文,我有一个 "Trasformed DStream"。

我尝试使用 transform()foreachRDD() 函数而不是 map( ),但我不知道如何在转换后 return 正确地创建一个新的 DStream。

例如:

tf_tweet = tweet.map(lambda tup: hashingTF.transform(tup[0:]))\
                .transform(classify_tweet)
                .pprint()

def classify_tweet(tf):

    #compute TF-IDF of the tweet
    idf = IDF().fit(tf)
    tf_idf = idf.transform(tf)

    #print(tf_idf.collect())

    return idf

如果我 运行 使用转换函数的代码,Spark 触发(在回溯的顶部)这个错误:

File "/workspace_spark/spark-1.6.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 67, in call return r._jrdd
AttributeError: 'IDFModel' object has no attribute '_jrdd'

但是如果我省略 return 语句并简单地打印 tf_idf 向量 它会给我正确的输出,如下所示:

[SparseVector(1048576, {164998: 0.0, 364601: 0.0, 924192: 0.0, 963449: 0.0})]
[SparseVector(1048576, {251465: 0.0, 821055: 0.0, 963449: 0.0})]
[SparseVector(1048576, {234762: 0.0, 280973: 0.0, 403903: 0.0, 712732: 0.0, 861562: 0.0, 1040690: 0.0})] ...

如果我做对了,我认为问题是我不能 return a SparseVector 当它期望 DStream.

无论如何,这个问题有解决方案吗?

如果有人能帮我解决这个问题,我将非常感激,我很悲惨地被困住了。

谢谢

Return 转换 tf_idf:

>>> def classify_tweet(tf):
...     return IDF().fit(tf).transform(tf)