PySpark 中 reduce 的正确输入
Proper input for reduce in PySpark
我正在尝试使用 spark 离散化一些数据。
我有以下格式的数据:
date zip amount
2013/04/02 04324 32.2
2013/04/01 23242 1.5
2013/04/02 99343 12
然后我有下面的代码:
sampleTable = sqlCtx.inferSchema(columns)
sampleTable.registerAsTable("amounts")
exTable = sampleTable.map(lambda p: {"date":p.date,"zip":p.zip,"amount":p.amount})
然后我有一个函数可以离散化:
def discretize((key, data), cutoff=0.75):
result = (data < np.percentile(index,cutoff))
return result
我将采用此结果列,稍后将其与原始数据集连接。
我正在尝试使用此语句执行操作:
exDiscretized = exTable.map(lambda x: (((dt.datetime.strptime(x.date,'%Y/%m/%d')).year, (dt.datetime.strptime(x.date,'%Y/%m/%d')).month), x.amount)).reduce(discretize).collect()
本质上,我想要一个((年,月),整行)的元组,这样我就可以找到每个月和年组合的第 75 个百分位数。
我可以让地图部分正常工作。当我取出 reduce 部分时,我的代码就可以工作了。
当我 运行 同时包含 map 和 reduce 的语句时,出现以下错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/rdd.py", line 715, in func
yield reduce(f, iterator, initial)
File "<stdin>", line 2, in discretize
File "/usr/local/lib/python2.7/dist-packages/numpy-1.9.1-py2.7-linux-x86_64.egg/numpy/lib/function_base.py", line 3051, in percentile
q = array(q, dtype=np.float64, copy=True)
ValueError: setting an array element with a sequence.
我不确定我做错了什么。也许这与我生成键值对的方式有关?
所以我认为问题的根源在于 reduce 无法按照您尝试使用的方式工作。由于您希望将单个键的所有数据放在一起,因此函数 groupByKey 可能就是您正在寻找的那个。这是一个例子:
input = sc.parallelize([("hi", 1), ("bye", 0), ("hi", 3)])
groupedInput = input.groupByKey()
def top(x):
data = list(x)
percentile = np.percentile(data, 0.70)
return filter(lambda x: x >= percentile , data)
modifiedGroupedInput = groupedInput.mapValues(top)
modifiedGroupedInput.collect()
结果:
[('bye', [0]), ('hi', [3])]
一般来说,reduceByKey 通常更好用,但是由于您想同时考虑每个键的所有元素来计算
我正在尝试使用 spark 离散化一些数据。
我有以下格式的数据:
date zip amount
2013/04/02 04324 32.2
2013/04/01 23242 1.5
2013/04/02 99343 12
然后我有下面的代码:
sampleTable = sqlCtx.inferSchema(columns)
sampleTable.registerAsTable("amounts")
exTable = sampleTable.map(lambda p: {"date":p.date,"zip":p.zip,"amount":p.amount})
然后我有一个函数可以离散化:
def discretize((key, data), cutoff=0.75):
result = (data < np.percentile(index,cutoff))
return result
我将采用此结果列,稍后将其与原始数据集连接。
我正在尝试使用此语句执行操作:
exDiscretized = exTable.map(lambda x: (((dt.datetime.strptime(x.date,'%Y/%m/%d')).year, (dt.datetime.strptime(x.date,'%Y/%m/%d')).month), x.amount)).reduce(discretize).collect()
本质上,我想要一个((年,月),整行)的元组,这样我就可以找到每个月和年组合的第 75 个百分位数。
我可以让地图部分正常工作。当我取出 reduce 部分时,我的代码就可以工作了。
当我 运行 同时包含 map 和 reduce 的语句时,出现以下错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/rdd.py", line 715, in func
yield reduce(f, iterator, initial)
File "<stdin>", line 2, in discretize
File "/usr/local/lib/python2.7/dist-packages/numpy-1.9.1-py2.7-linux-x86_64.egg/numpy/lib/function_base.py", line 3051, in percentile
q = array(q, dtype=np.float64, copy=True)
ValueError: setting an array element with a sequence.
我不确定我做错了什么。也许这与我生成键值对的方式有关?
所以我认为问题的根源在于 reduce 无法按照您尝试使用的方式工作。由于您希望将单个键的所有数据放在一起,因此函数 groupByKey 可能就是您正在寻找的那个。这是一个例子:
input = sc.parallelize([("hi", 1), ("bye", 0), ("hi", 3)])
groupedInput = input.groupByKey()
def top(x):
data = list(x)
percentile = np.percentile(data, 0.70)
return filter(lambda x: x >= percentile , data)
modifiedGroupedInput = groupedInput.mapValues(top)
modifiedGroupedInput.collect()
结果:
[('bye', [0]), ('hi', [3])]
一般来说,reduceByKey 通常更好用,但是由于您想同时考虑每个键的所有元素来计算