在 pyspark RDD 上应用映射函数
apply a map function on pyspark RDD
我通过读取 mongodb 集合得到了一个 rdd,现在我想更改一些值,然后 update/load 将该数据返回到相同或其他集合。
mr1 = sc.mongoRDD('mongodb://localhost:27017/test_database.test2')
type(mr1) #<class 'pyspark.rdd.PipelinedRDD'>
mr1.collect()
#[{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'ravi', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'ravi', u'sal': u'3000'}]
#I want to change the name 'ravi' to 'Satya'
mr2 = mr1.map( lambda x: x['name'].replace('ravi','SATYA'))
#o/p: [u'SATYA', u'SATYA'] ##not all values
#Expected: [{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'SATYA', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'SATYA', u'sal': u'3000'}]
请帮助,如何在此处应用映射函数以获取替换名称的相同 rdd mr1。
谢谢。
尝试:
def replace(x, key, fr, to):
d = x.copy()
if key in d:
d[key] = d[key].replace('ravi','SATYA')
return d
mr1.map(lambda x: replace(x, 'name', 'ravi','SATYA'))
成功了-
def rep(x):
if x['name'] == 'ravi':
x['name']='SATYA'
return x
mr2 = mr1.map(lambda x: rep(x))
我通过读取 mongodb 集合得到了一个 rdd,现在我想更改一些值,然后 update/load 将该数据返回到相同或其他集合。
mr1 = sc.mongoRDD('mongodb://localhost:27017/test_database.test2')
type(mr1) #<class 'pyspark.rdd.PipelinedRDD'>
mr1.collect()
#[{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'ravi', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'ravi', u'sal': u'3000'}]
#I want to change the name 'ravi' to 'Satya'
mr2 = mr1.map( lambda x: x['name'].replace('ravi','SATYA'))
#o/p: [u'SATYA', u'SATYA'] ##not all values
#Expected: [{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'SATYA', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'SATYA', u'sal': u'3000'}]
请帮助,如何在此处应用映射函数以获取替换名称的相同 rdd mr1。
谢谢。
尝试:
def replace(x, key, fr, to):
d = x.copy()
if key in d:
d[key] = d[key].replace('ravi','SATYA')
return d
mr1.map(lambda x: replace(x, 'name', 'ravi','SATYA'))
成功了-
def rep(x):
if x['name'] == 'ravi':
x['name']='SATYA'
return x
mr2 = mr1.map(lambda x: rep(x))