执行组聚合以填充 RDD 中的字段值
Perform group aggregation to populate field values in RDD
我必须编写一段代码,首先将文本文件发送到一个 RDD,然后根据 VIN 填充空单元格。
CSV 文件是:
'1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors',
'2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors',
'3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision',
'4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission',
'5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax',
'6,R,VOME254OOXW344325,,,,2015-02-06,Wheel alignment service',
'7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light',
'8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars',
'9,A,VOME254OOXW344325,,,,2015-10-01,Side collision',
'10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires',
'11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine',
'12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover',
'13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door',
'14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax',
'15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax',
'16,A,INU45KIOOPA343980,,,,2020-05-01,Side collision'
我写了:
def extract_vin_key_value(line):
sr=line.split(',')
return (sr[2]),(sr[3],sr[5]),
raw_rdd = sc.textFile('/FileStore/tables/data.csv')
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))
vin_kv.collect()
第一部分结果如下:
[('VXIO456XLBB630221', ('Nissan', '2003')),
('INU45KIOOPA343980', ('Mercedes', '2015')),
('VXIO456XLBB630221', ('', '')),
('VXIO456XLBB630221', ('', '')),
('VOME254OOXW344325', ('Mercedes', '2015')),
('VOME254OOXW344325', ('', '')),
('VXIO456XLBB630221', ('', '')),
('EXOA00341AB123456', ('Mercedes', '2016')),
('VOME254OOXW344325', ('', '')),
('VOME254OOXW344325', ('', '')),
('EXOA00341AB123456', ('', '')),
('EXOA00341AB123456', ('', '')),
('VOME254OOXW344325', ('', '')),
('UXIA769ABCC447906', ('Toyota', '2017')),
('UXIA769ABCC447906', ('', '')),
('INU45KIOOPA343980', ('', ''))]
现在,我必须通过开发一种方法来填充模型并基于 VIN 进行制作:
def populate_make(line):
vin=line[0]
...
...
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))
enhance_make.collect()
我刚刚收到以下错误消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed
1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 20) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError:
'ResultIterable' object is not subscriptable
如何在第二个函数中拆分数据,我使用拆分方法但收到:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed
1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 25) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException:
'AttributeError: 'ResultIterable' object has no attribute 'split'
groupByKey
returns 对类型 (Any, ResultIterable)
。在docstring中,原因简单解释为
A special result iterable. This is used because the standard iterator can not be pickled
我还没有明白这个意思,我会研究一下。对了,解决办法是把ResultIterable
转成列表:
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(list(kv[1])))
我必须编写一段代码,首先将文本文件发送到一个 RDD,然后根据 VIN 填充空单元格。
CSV 文件是:
'1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors',
'2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors',
'3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision',
'4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission',
'5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax',
'6,R,VOME254OOXW344325,,,,2015-02-06,Wheel alignment service',
'7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light',
'8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars',
'9,A,VOME254OOXW344325,,,,2015-10-01,Side collision',
'10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires',
'11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine',
'12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover',
'13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door',
'14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax',
'15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax',
'16,A,INU45KIOOPA343980,,,,2020-05-01,Side collision'
我写了:
def extract_vin_key_value(line):
sr=line.split(',')
return (sr[2]),(sr[3],sr[5]),
raw_rdd = sc.textFile('/FileStore/tables/data.csv')
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))
vin_kv.collect()
第一部分结果如下:
[('VXIO456XLBB630221', ('Nissan', '2003')),
('INU45KIOOPA343980', ('Mercedes', '2015')),
('VXIO456XLBB630221', ('', '')),
('VXIO456XLBB630221', ('', '')),
('VOME254OOXW344325', ('Mercedes', '2015')),
('VOME254OOXW344325', ('', '')),
('VXIO456XLBB630221', ('', '')),
('EXOA00341AB123456', ('Mercedes', '2016')),
('VOME254OOXW344325', ('', '')),
('VOME254OOXW344325', ('', '')),
('EXOA00341AB123456', ('', '')),
('EXOA00341AB123456', ('', '')),
('VOME254OOXW344325', ('', '')),
('UXIA769ABCC447906', ('Toyota', '2017')),
('UXIA769ABCC447906', ('', '')),
('INU45KIOOPA343980', ('', ''))]
现在,我必须通过开发一种方法来填充模型并基于 VIN 进行制作:
def populate_make(line):
vin=line[0]
...
...
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))
enhance_make.collect()
我刚刚收到以下错误消息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed
1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 20) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError:
'ResultIterable' object is not subscriptable
如何在第二个函数中拆分数据,我使用拆分方法但收到:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed
1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 25) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException:
'AttributeError: 'ResultIterable' object has no attribute 'split'
groupByKey
returns 对类型 (Any, ResultIterable)
。在docstring中,原因简单解释为
A special result iterable. This is used because the standard iterator can not be pickled
我还没有明白这个意思,我会研究一下。对了,解决办法是把ResultIterable
转成列表:
enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(list(kv[1])))