火花蓄能器不工作
Spark Accumulator not working
我想使用累加器从 this 数据中获取已关闭订单的数量。但它给了我错误的答案,只是零(0)。问题是什么?我正在使用 Hortonworks 沙盒。代码如下。我正在使用火花提交。
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(N_closed, line):
status =(line.split(",")[-1]=="CLOSED")
if status:
N_closed.add(1)
return status
closedRDD = rdd.filter(lambda x: is_closed(N_closed, x))
print('The answer is ' + str(N_closed.value))
但是当我提交时,我得到零。
spark-submit --master yarn closedCounter.py
更新日期:
现在,当我更改我的代码时,它工作正常。这是正确的做法吗?
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(line):
global N_closed
status =(line.split(",")[-1]=="CLOSED")
if status:
N_closed.add(1)
rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))
第二次更新:
我现在明白了,在 Jupyter Notebook 中,没有 Yarn,它给了我正确的答案,因为我在检查累加器的值之前调用了一个动作(计数)。
Computations inside transformations are evaluated lazily, so unless an action happens on an RDD the transformationsare not executed. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD
https://www.edureka.co/blog/spark-accumulators-explained
(Scala 中的示例)
但基本上,您需要对 rdd
执行操作。
例如
N_closed = sc.accumulator(0)
def is_closed(line):
status = line.split(",")[-1]=="CLOSED"
if status:
N_closed.add(1)
return status
rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))
我想使用累加器从 this 数据中获取已关闭订单的数量。但它给了我错误的答案,只是零(0)。问题是什么?我正在使用 Hortonworks 沙盒。代码如下。我正在使用火花提交。
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(N_closed, line):
status =(line.split(",")[-1]=="CLOSED")
if status:
N_closed.add(1)
return status
closedRDD = rdd.filter(lambda x: is_closed(N_closed, x))
print('The answer is ' + str(N_closed.value))
但是当我提交时,我得到零。
spark-submit --master yarn closedCounter.py
更新日期:
现在,当我更改我的代码时,它工作正常。这是正确的做法吗?
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(line):
global N_closed
status =(line.split(",")[-1]=="CLOSED")
if status:
N_closed.add(1)
rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))
第二次更新:
我现在明白了,在 Jupyter Notebook 中,没有 Yarn,它给了我正确的答案,因为我在检查累加器的值之前调用了一个动作(计数)。
Computations inside transformations are evaluated lazily, so unless an action happens on an RDD the transformationsare not executed. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD
https://www.edureka.co/blog/spark-accumulators-explained
(Scala 中的示例)
但基本上,您需要对 rdd
执行操作。
例如
N_closed = sc.accumulator(0)
def is_closed(line):
status = line.split(",")[-1]=="CLOSED"
if status:
N_closed.add(1)
return status
rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))