如何在pyspark中解析csv格式的元组数据?

How to parse tuple data from csv format in pyspark?

数据集为 CSV 格式。此文件中的每一行都包含一个元组,其中第一个元素是植物的名称,其余元素是植物所在的州。

示例:

abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi

我如何解析它并创建一个包含列 plants 和 states 的数据框(这将包含除植物之外的所有数据,如列表)?

您可以使用 rdd.map() 或使用 DataFrames 和 udf():

RDD

首先创建一个示例数据集:

text = """abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi"""

rdd = sc.parallelize(map(lambda x: (x,), text.split("\n")))
rdd.toDF(["rawText"]).show(truncate=False)
#+--------------------------------------------------------+
#|rawText                                                 |
#+--------------------------------------------------------+
#|abelia,fl,nc                                            |
#|abelia x grandiflora,fl,nc                              |
#|abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi|
#+--------------------------------------------------------+

现在使用 map() 两次。首先通过拆分 , 将每条记录映射到列表。第二个将拆分后的字符串映射为 (x[0], x[1:]):

形式的元组
rdd.map(lambda x: x[0].split(','))\
    .map(lambda x: (x[0], x[1:]))\
    .toDF(["plant", "states"])\
    .show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant               |states                                                      |
#+--------------------+------------------------------------------------------------+
#|abelia              |[fl, nc]                                                    |
#|abelia x grandiflora|[fl, nc]                                                    |
#|abelmoschus         |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+

您也可以在 map() 的一次调用中完成此操作,但为了便于阅读,我将其分成两部分。

数据框

import pyspark.sql.functions as f
df = sqlCtx.createDataFrame(map(lambda x: (x,), text.split("\n")), ["rawText"])

# define udf to split a string on comma and return all
# of the elements except the first one
get_states = f.udf(lambda x: x.split(',')[1:], ArrayType(StringType()))

df.withColumn('plant', f.split('rawText', ',')[0])\
    .withColumn('states', get_states('rawText'))\
    .select('plant', 'states')\
    .show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant               |states                                                      |
#+--------------------+------------------------------------------------------------+
#|abelia              |[fl, nc]                                                    |
#|abelia x grandiflora|[fl, nc]                                                    |
#|abelmoschus         |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+