PySpark:标记点 RDD 的许多功能
PySpark: Many features to Labeled Point RDD
Spark 新手,我读过的所有示例都处理小数据集,例如:
RDD = sc.parallelize([
LabeledPoint(1, [1.0, 2.0, 3.0]),
LabeledPoint(2, [3.0, 4.0, 5.0]),
但是,我有一个包含 50 多个特征的大型数据集。
一行示例
u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5'
我想在 PySpark 中快速创建一个 Labeledpoint RDD。我尝试将最后一个位置索引为 Labeledpoint RDD 中的第一个数据点,然后将前 n-1 个位置索引为密集向量。但是我收到以下错误。任何指导表示赞赏!注意:如果在创建标记点时将 [] 更改为 (),则会出现错误 "Invalid Syntax".
df = myDataRDD.map(lambda line: line.split(','))
data = [
LabeledPoint(df[54], df[0:53])
]
TypeError: 'PipelinedRDD' object does not support indexing
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-67-fa1b56e8441e> in <module>()
2 df = myDataRDD.map(lambda line: line.split(','))
3 data = [
----> 4 LabeledPoint(df[54], df[0:53])
5 ]
TypeError: 'PipelinedRDD' object does not support indexing
您不能使用索引,而必须使用 Spark API 中可用的方法。所以:
data = [ LabeledPoint(myDataRDD.take(RDD.count()), #Last element
myDataRDD.top(RDD.count()-1)) #All but last ]
(未经测试,不过,这是大体思路)
作为您收到的错误,您无法通过索引访问 RDD。
您需要第二个 map
语句将您的序列转换为 LabeledPoint
s
rows = [u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5', u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5']
rows_rdd = sc.parallelize(rows) # create RDD with given rows
labeled_points_rdd = rows_rdd\
.map(lambda row: row.split(','))\ # split rows into sequences
.map(lambda seq: LabeledPoint(seq[-1],seq[:-2])) # create Labeled Points from these sequences with last Item as label
print labeled_points_rdd.take(2)
# prints [LabeledPoint(5.0, [2596.0,51.0,3.0,258.0,0.0,510.0,221.0,...]),
# LabeledPoint(5.0,[2596.0,51.0,3.0,258.0,0.0,510.0,221.0,...])
请注意 python 中的负索引让您可以反向访问序列。
使用 .take(n)
,您可以从 RDD 中获取前 n
个元素。
希望这对您有所帮助。
Spark 新手,我读过的所有示例都处理小数据集,例如:
RDD = sc.parallelize([
LabeledPoint(1, [1.0, 2.0, 3.0]),
LabeledPoint(2, [3.0, 4.0, 5.0]),
但是,我有一个包含 50 多个特征的大型数据集。
一行示例
u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5'
我想在 PySpark 中快速创建一个 Labeledpoint RDD。我尝试将最后一个位置索引为 Labeledpoint RDD 中的第一个数据点,然后将前 n-1 个位置索引为密集向量。但是我收到以下错误。任何指导表示赞赏!注意:如果在创建标记点时将 [] 更改为 (),则会出现错误 "Invalid Syntax".
df = myDataRDD.map(lambda line: line.split(','))
data = [
LabeledPoint(df[54], df[0:53])
]
TypeError: 'PipelinedRDD' object does not support indexing
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-67-fa1b56e8441e> in <module>()
2 df = myDataRDD.map(lambda line: line.split(','))
3 data = [
----> 4 LabeledPoint(df[54], df[0:53])
5 ]
TypeError: 'PipelinedRDD' object does not support indexing
您不能使用索引,而必须使用 Spark API 中可用的方法。所以:
data = [ LabeledPoint(myDataRDD.take(RDD.count()), #Last element
myDataRDD.top(RDD.count()-1)) #All but last ]
(未经测试,不过,这是大体思路)
作为您收到的错误,您无法通过索引访问 RDD。
您需要第二个 map
语句将您的序列转换为 LabeledPoint
s
rows = [u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5', u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5']
rows_rdd = sc.parallelize(rows) # create RDD with given rows
labeled_points_rdd = rows_rdd\
.map(lambda row: row.split(','))\ # split rows into sequences
.map(lambda seq: LabeledPoint(seq[-1],seq[:-2])) # create Labeled Points from these sequences with last Item as label
print labeled_points_rdd.take(2)
# prints [LabeledPoint(5.0, [2596.0,51.0,3.0,258.0,0.0,510.0,221.0,...]),
# LabeledPoint(5.0,[2596.0,51.0,3.0,258.0,0.0,510.0,221.0,...])
请注意 python 中的负索引让您可以反向访问序列。
使用 .take(n)
,您可以从 RDD 中获取前 n
个元素。
希望这对您有所帮助。