使用 Pyspark 命名变量

Naming Variables using Pyspark

尽管我的问题很简单,但由于我是 spark 的新手,所以在解决问题时遇到了问题。

针对我的问题的正常 python 查询如下:

for line in file('schedule.txt'):
  origin,dest,depart,arrive,price=line.split(',')

我可以阅读文件

sched=sc.textFile('/PATH/schedule.txt')

但是当我尝试以下代码时:

  origin,dest,depart,arrive,price=sched.split(',')

我收到这个错误:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-46-ba0e8c07ca89> in <module>()
----> 1 origin,dest,depart,arrive,price=sched.split(',')

AttributeError: 'RDD' object has no attribute 'split'

我可以使用 lambda 函数拆分文件。但是不知道如何创建这 5 个变量名。

如果有人可以帮助我。

sched=sc.textFile('/PATH/schedule.txt') returns 一个 RDD 与 python 文件对象不同的数据类型,支持不同的 API。您的 python 代码相当于:

sched=sc.textFile('/PATH/schedule.txt')
# extract values
vals = sched.map(lambda line:line.split(','))
# now you can do some processing, for example sum price
price = vals.reduce(lambda v1,v2:v1[4]+v2[4])
# or just collect the raw values
raw_vals = vals.collect()

更新: 如果您希望能够将每一行的值作为局部变量访问,您可以定义一个专用函数而不只是一个 lambda 并将其传递给 .map():

def process_line(line):
    origin,dest,depart,arrive,price=line.split(',')
    # do whatever
    # remember to return a result

sche.map(process_line)

更新2:

您要对文件进行的具体处理并不简单,因为它需要写入共享变量 (flights)。相反,我建议按 orig,dest 对行进行分组,然后收集结果并插入字典:

flights_data = sched.map(lambda line: ((line[0],line[1]),tuple(line[2:]))).groupByKey().collect()
flights = {f:ds for f,ds in flights_data}