rdd(pyspark)索引中的逗号分隔数据超出范围问题

Comma separated data in rdd (pyspark) indices out of bound problem

我有一个逗号分隔的 csv 文件。其中一列的数据再次以逗号分隔。该特定列中的每一行都有不同的单词数,因此逗号数不同。当我访问或执行任何类型的操作(如过滤)时(拆分数据后),它会在 pyspark 中引发错误。我该如何处理此类数据?例如,其中一列是颜色,表示每个条目的数据不同,1. 红色,蓝色 2. 红色,蓝色,橙色。拆分后,每一行的下一列索引 change/shift。

数据为表格形式

|id| category|color            |price|
--------------------------------------
|1 | a       |red,blue         | 2000|
--------------------------------------
|2 | b       | black           | 5000|
--------------------------------------
|3 | c       |green,black,blue | 3000|

数据以逗号分隔,因此通过文本编辑器打开时显示如下。

id,category,color,price
1,a,red,blue,2000
2,b,black,5000
3,c,green,black,blue,3000

我试过进行以下操作,均无效。如何处理这些数据。

a.filter(lambda x : 'id' not in x).filter(lambda x: (x.split(",")[4]=='2000')).map(lambda x: x.split(",")[1]).collect() 

a.filter(lambda x : 'id' not in x).filter(lambda x: (x.split(",")[3]=='2000')).map(lambda x: x.split(",")[1]).collect() 

像这样的东西应该可以工作:

# Read data and add a row index
rdd = sc.textFile("example.txt").zipWithIndex()

# Get first row - columns
columns = rdd.filter(lambda x: x[1] == 0).map(lambda x: x[0].split(",")).collect()[0]

# Get actual data - all the other rows
data = rdd.filter(lambda x: x[1] > 0).map(lambda x: x[0].split(","))

# Split out data rows into fields and covert to a DF
data = data.map(lambda x: (x[0], x[1], ",".join(x[2:-1]), x[-1])).toDF(schema=columns)

data.show()

+---+--------+----------------+-----+
| id|category|           color|price|
+---+--------+----------------+-----+
|  1|       a|        red,blue| 2000|
|  2|       b|           black| 5000|
|  3|       c|green,black,blue| 3000|
+---+--------+----------------+-----+