SPARK STREAMING 中的字数百分比 (PYTHON)
% wordcount in SPARK STREAMING (PYTHON)
在下一个示例中,我从 Kafka 接收到一个序列词:
('cat')
('dog')
('rat')
('dog')
我的目标是计算每个单词的历史百分比。我将有两个 RDD,一个包含历史字数,另一个包含所有字数:
values = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
def updatefunc (new_value, last_value):
if last_value is None:
last_value = 0
return sum(new_value, last_value)
words=values.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
historic= words.updateStateByKey(updatefunc).\
transform(lambda rdd: rdd.sortBy(lambda (x,v): x))
totalNo = words.\
map(lambda x: x[1]).reduce(lambda a,b:a+b).map(lambda x: (('totalsum',x))).updateStateByKey(updatefunc).map(lambda x:x[1])
现在我尝试除法:((每个键的历史值)/totalNo)*100 得到每个单词的百分比:
solution=historic.map(lambda x: x[0],x[1]*100/totalNo)
但我收到错误消息:
It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063
如何修复 totalNO
的值以使用它在另一个 RDD 中运行?
终于可以用这种方式了:
words = KafkaUtils.createDirectStream(ssc, topics=['test'], kafkaParams={'bootstrap.servers': 'localhost:9092'})\
.map(lambda x: x[1]).flatMap(lambda x: list(x))
historic = words.map(lambda x: (x, 1)).updateStateByKey(lambda x, y: sum(x) + (y or 0))
def func(rdd):
if not rdd.isEmpty():
totalNo = rdd.map(lambda x: x[1]).reduce(add)
rdd = rdd.map(lambda x: (x[0], x[1] / totalNo))
return rdd
solution = historic.transform(func)
solution.pprint()
这是你想要的吗?
在下一个示例中,我从 Kafka 接收到一个序列词:
('cat')
('dog')
('rat')
('dog')
我的目标是计算每个单词的历史百分比。我将有两个 RDD,一个包含历史字数,另一个包含所有字数:
values = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
def updatefunc (new_value, last_value):
if last_value is None:
last_value = 0
return sum(new_value, last_value)
words=values.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
historic= words.updateStateByKey(updatefunc).\
transform(lambda rdd: rdd.sortBy(lambda (x,v): x))
totalNo = words.\
map(lambda x: x[1]).reduce(lambda a,b:a+b).map(lambda x: (('totalsum',x))).updateStateByKey(updatefunc).map(lambda x:x[1])
现在我尝试除法:((每个键的历史值)/totalNo)*100 得到每个单词的百分比:
solution=historic.map(lambda x: x[0],x[1]*100/totalNo)
但我收到错误消息:
It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063
如何修复 totalNO
的值以使用它在另一个 RDD 中运行?
终于可以用这种方式了:
words = KafkaUtils.createDirectStream(ssc, topics=['test'], kafkaParams={'bootstrap.servers': 'localhost:9092'})\
.map(lambda x: x[1]).flatMap(lambda x: list(x))
historic = words.map(lambda x: (x, 1)).updateStateByKey(lambda x, y: sum(x) + (y or 0))
def func(rdd):
if not rdd.isEmpty():
totalNo = rdd.map(lambda x: x[1]).reduce(add)
rdd = rdd.map(lambda x: (x[0], x[1] / totalNo))
return rdd
solution = historic.transform(func)
solution.pprint()
这是你想要的吗?