ValueError: too many values to unpack (while reducing with foldByKey)
ValueError: too many values to unpack (while reducing with foldByKey)
遇到 "ValueError: too many values to unpack" 错误,而 运行 下面的代码,目的是为每个键构建值的直方图:
%pyspark
import datetime
from pyspark.sql import SQLContext, Row
def featVecSms( x ):
sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' )
hourOfDay = int( sttm.strftime( '%H' ) )
dayOfWeek = int( sttm.strftime( '%w' ) )
dayOfMonth = int( sttm.strftime( '%d' ) )
duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm
duration = duration.total_seconds()
service = x[3]
resultCode = int( x[4] )
msc = x[5]
actionMap = {
"0":'fsm',
"1":'fsm',
"2000":'sri',
"2001":'sri',
"2100":'sri',
"2101":'sri',
"2102":'fsm',
"2200":'sri',
"2201":'sri',
"2202":'fsm',
"2203":'fsm',
"2204":'fsm',
"2205":'fsm',
"2206":'fsm',
"2207":'sri',
"2208":'sri',
"2209":'sri',
"2210":'fsm',
"2211":'fsm',
"2212":'fsm',
"2213":'fsm',
"2214":'fsm',
"2215":'sri',
"2216":'fsm'
}
action = actionMap.get( x[4] )
return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode, msc, action )
textFile = sc.textFile("/export/sampleMsesAll.txt")
enTuples = textFile.map(lambda x: x.split("', u'"))
msRec = enTuples.map( featVecSms )
def countByCrit( accVal, currVal, idx ):
accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1
return accVal
def countByTod( accVal, currVal ):
return countByCrit( accVal, currVal, 1 )
todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod )
#.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) )
msTodSuccess.collect()
抛出以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally
merger.mergeValues(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues
for k, v in iterator:
ValueError: too many values to unpack
数据如下所示:
$ head -15 /export/sampleMses10M.txt/part-00000
(u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314')
(u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined')
(u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027')
(u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027')
(u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')
样本只有123k,应用中应该有千万条记录。
你的代码有问题是类型错误。
首先 *byKey
方法对 PairwiseRDDs
进行操作。在 Python 中,它表示包含长度为 2 的元组或其他结构(我们称之为 pair
)的 RDD,可以像这样解包:
k, v = pair
msRec
,其中包含长度为 9 的元素,显然不会在这里工作。
第二个问题是你使用了错误的转换。我们来看看Scala中foldByKey
的签名:
def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
其中 V
是值的一种类型 (RDD[(K, V)]
)。如您所见,函数的 zeroValue
和返回类型应该与值的类型相同,这显然不是这里的情况。
如果结果类型与输入类型不同,您应该使用 combineByKey
或 aggregateByKey
。
遇到 "ValueError: too many values to unpack" 错误,而 运行 下面的代码,目的是为每个键构建值的直方图:
%pyspark
import datetime
from pyspark.sql import SQLContext, Row
def featVecSms( x ):
sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' )
hourOfDay = int( sttm.strftime( '%H' ) )
dayOfWeek = int( sttm.strftime( '%w' ) )
dayOfMonth = int( sttm.strftime( '%d' ) )
duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm
duration = duration.total_seconds()
service = x[3]
resultCode = int( x[4] )
msc = x[5]
actionMap = {
"0":'fsm',
"1":'fsm',
"2000":'sri',
"2001":'sri',
"2100":'sri',
"2101":'sri',
"2102":'fsm',
"2200":'sri',
"2201":'sri',
"2202":'fsm',
"2203":'fsm',
"2204":'fsm',
"2205":'fsm',
"2206":'fsm',
"2207":'sri',
"2208":'sri',
"2209":'sri',
"2210":'fsm',
"2211":'fsm',
"2212":'fsm',
"2213":'fsm',
"2214":'fsm',
"2215":'sri',
"2216":'fsm'
}
action = actionMap.get( x[4] )
return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode, msc, action )
textFile = sc.textFile("/export/sampleMsesAll.txt")
enTuples = textFile.map(lambda x: x.split("', u'"))
msRec = enTuples.map( featVecSms )
def countByCrit( accVal, currVal, idx ):
accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1
return accVal
def countByTod( accVal, currVal ):
return countByCrit( accVal, currVal, 1 )
todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod )
#.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) )
msTodSuccess.collect()
抛出以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally
merger.mergeValues(iterator)
File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues
for k, v in iterator:
ValueError: too many values to unpack
数据如下所示:
$ head -15 /export/sampleMses10M.txt/part-00000
(u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314')
(u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined')
(u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027')
(u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027')
(u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')
样本只有123k,应用中应该有千万条记录。
你的代码有问题是类型错误。
首先 *byKey
方法对 PairwiseRDDs
进行操作。在 Python 中,它表示包含长度为 2 的元组或其他结构(我们称之为 pair
)的 RDD,可以像这样解包:
k, v = pair
msRec
,其中包含长度为 9 的元素,显然不会在这里工作。
第二个问题是你使用了错误的转换。我们来看看Scala中foldByKey
的签名:
def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
其中 V
是值的一种类型 (RDD[(K, V)]
)。如您所见,函数的 zeroValue
和返回类型应该与值的类型相同,这显然不是这里的情况。
如果结果类型与输入类型不同,您应该使用 combineByKey
或 aggregateByKey
。