使用 Pyspark 命名变量
Naming Variables using Pyspark
尽管我的问题很简单,但由于我是 spark 的新手,所以在解决问题时遇到了问题。
针对我的问题的正常 python 查询如下:
for line in file('schedule.txt'):
origin,dest,depart,arrive,price=line.split(',')
我可以阅读文件
sched=sc.textFile('/PATH/schedule.txt')
但是当我尝试以下代码时:
origin,dest,depart,arrive,price=sched.split(',')
我收到这个错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-46-ba0e8c07ca89> in <module>()
----> 1 origin,dest,depart,arrive,price=sched.split(',')
AttributeError: 'RDD' object has no attribute 'split'
我可以使用 lambda 函数拆分文件。但是不知道如何创建这 5 个变量名。
如果有人可以帮助我。
sched=sc.textFile('/PATH/schedule.txt')
returns 一个 RDD
与 python 文件对象不同的数据类型,支持不同的 API。您的 python 代码相当于:
sched=sc.textFile('/PATH/schedule.txt')
# extract values
vals = sched.map(lambda line:line.split(','))
# now you can do some processing, for example sum price
price = vals.reduce(lambda v1,v2:v1[4]+v2[4])
# or just collect the raw values
raw_vals = vals.collect()
更新:
如果您希望能够将每一行的值作为局部变量访问,您可以定义一个专用函数而不只是一个 lambda 并将其传递给 .map()
:
def process_line(line):
origin,dest,depart,arrive,price=line.split(',')
# do whatever
# remember to return a result
sche.map(process_line)
更新2:
您要对文件进行的具体处理并不简单,因为它需要写入共享变量 (flights
)。相反,我建议按 orig,dest
对行进行分组,然后收集结果并插入字典:
flights_data = sched.map(lambda line: ((line[0],line[1]),tuple(line[2:]))).groupByKey().collect()
flights = {f:ds for f,ds in flights_data}
尽管我的问题很简单,但由于我是 spark 的新手,所以在解决问题时遇到了问题。
针对我的问题的正常 python 查询如下:
for line in file('schedule.txt'):
origin,dest,depart,arrive,price=line.split(',')
我可以阅读文件
sched=sc.textFile('/PATH/schedule.txt')
但是当我尝试以下代码时:
origin,dest,depart,arrive,price=sched.split(',')
我收到这个错误:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-46-ba0e8c07ca89> in <module>()
----> 1 origin,dest,depart,arrive,price=sched.split(',')
AttributeError: 'RDD' object has no attribute 'split'
我可以使用 lambda 函数拆分文件。但是不知道如何创建这 5 个变量名。
如果有人可以帮助我。
sched=sc.textFile('/PATH/schedule.txt')
returns 一个 RDD
与 python 文件对象不同的数据类型,支持不同的 API。您的 python 代码相当于:
sched=sc.textFile('/PATH/schedule.txt')
# extract values
vals = sched.map(lambda line:line.split(','))
# now you can do some processing, for example sum price
price = vals.reduce(lambda v1,v2:v1[4]+v2[4])
# or just collect the raw values
raw_vals = vals.collect()
更新:
如果您希望能够将每一行的值作为局部变量访问,您可以定义一个专用函数而不只是一个 lambda 并将其传递给 .map()
:
def process_line(line):
origin,dest,depart,arrive,price=line.split(',')
# do whatever
# remember to return a result
sche.map(process_line)
更新2:
您要对文件进行的具体处理并不简单,因为它需要写入共享变量 (flights
)。相反,我建议按 orig,dest
对行进行分组,然后收集结果并插入字典:
flights_data = sched.map(lambda line: ((line[0],line[1]),tuple(line[2:]))).groupByKey().collect()
flights = {f:ds for f,ds in flights_data}