如何在pyspark中解析csv格式的元组数据?
How to parse tuple data from csv format in pyspark?
数据集为 CSV 格式。此文件中的每一行都包含一个元组,其中第一个元素是植物的名称,其余元素是植物所在的州。
示例:
abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi
我如何解析它并创建一个包含列 plants 和 states 的数据框(这将包含除植物之外的所有数据,如列表)?
您可以使用 rdd.map()
或使用 DataFrames 和 udf()
:
RDD
首先创建一个示例数据集:
text = """abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi"""
rdd = sc.parallelize(map(lambda x: (x,), text.split("\n")))
rdd.toDF(["rawText"]).show(truncate=False)
#+--------------------------------------------------------+
#|rawText |
#+--------------------------------------------------------+
#|abelia,fl,nc |
#|abelia x grandiflora,fl,nc |
#|abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi|
#+--------------------------------------------------------+
现在使用 map()
两次。首先通过拆分 ,
将每条记录映射到列表。第二个将拆分后的字符串映射为 (x[0], x[1:])
:
形式的元组
rdd.map(lambda x: x[0].split(','))\
.map(lambda x: (x[0], x[1:]))\
.toDF(["plant", "states"])\
.show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant |states |
#+--------------------+------------------------------------------------------------+
#|abelia |[fl, nc] |
#|abelia x grandiflora|[fl, nc] |
#|abelmoschus |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+
您也可以在 map()
的一次调用中完成此操作,但为了便于阅读,我将其分成两部分。
数据框
import pyspark.sql.functions as f
df = sqlCtx.createDataFrame(map(lambda x: (x,), text.split("\n")), ["rawText"])
# define udf to split a string on comma and return all
# of the elements except the first one
get_states = f.udf(lambda x: x.split(',')[1:], ArrayType(StringType()))
df.withColumn('plant', f.split('rawText', ',')[0])\
.withColumn('states', get_states('rawText'))\
.select('plant', 'states')\
.show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant |states |
#+--------------------+------------------------------------------------------------+
#|abelia |[fl, nc] |
#|abelia x grandiflora|[fl, nc] |
#|abelmoschus |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+
数据集为 CSV 格式。此文件中的每一行都包含一个元组,其中第一个元素是植物的名称,其余元素是植物所在的州。
示例:
abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi
我如何解析它并创建一个包含列 plants 和 states 的数据框(这将包含除植物之外的所有数据,如列表)?
您可以使用 rdd.map()
或使用 DataFrames 和 udf()
:
RDD
首先创建一个示例数据集:
text = """abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi"""
rdd = sc.parallelize(map(lambda x: (x,), text.split("\n")))
rdd.toDF(["rawText"]).show(truncate=False)
#+--------------------------------------------------------+
#|rawText |
#+--------------------------------------------------------+
#|abelia,fl,nc |
#|abelia x grandiflora,fl,nc |
#|abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi|
#+--------------------------------------------------------+
现在使用 map()
两次。首先通过拆分 ,
将每条记录映射到列表。第二个将拆分后的字符串映射为 (x[0], x[1:])
:
rdd.map(lambda x: x[0].split(','))\
.map(lambda x: (x[0], x[1:]))\
.toDF(["plant", "states"])\
.show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant |states |
#+--------------------+------------------------------------------------------------+
#|abelia |[fl, nc] |
#|abelia x grandiflora|[fl, nc] |
#|abelmoschus |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+
您也可以在 map()
的一次调用中完成此操作,但为了便于阅读,我将其分成两部分。
数据框
import pyspark.sql.functions as f
df = sqlCtx.createDataFrame(map(lambda x: (x,), text.split("\n")), ["rawText"])
# define udf to split a string on comma and return all
# of the elements except the first one
get_states = f.udf(lambda x: x.split(',')[1:], ArrayType(StringType()))
df.withColumn('plant', f.split('rawText', ',')[0])\
.withColumn('states', get_states('rawText'))\
.select('plant', 'states')\
.show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant |states |
#+--------------------+------------------------------------------------------------+
#|abelia |[fl, nc] |
#|abelia x grandiflora|[fl, nc] |
#|abelmoschus |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+