遍历 Spark RDD
Iterating through a Spark RDD
从 Spark DataFrame 开始创建向量矩阵以进行进一步的分析处理。
feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()
输出是一个向量数组。其中一些向量中有一个 null
>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])
据此我想遍历向量矩阵并创建一个 LabeledPoint 数组,如果向量包含空值,则为 0(零),否则为 1。
def f(row):
if row.contain(None):
LabeledPoint(1.0,row)
else:
LabeledPoint(0.0,row)
我尝试使用
遍历向量矩阵
feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol
但这不起作用。
TypeError: 'PipelinedRDD' object is not iterable
任何帮助都会很棒
RDDs
不是 Python 列表的替代品。您必须使用给定 RDD
上可用的操作或转换。在这里你可以简单地使用 map
:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint
feature_matrix_vectors = sc.parallelize([
DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])
(feature_matrix_vectors
.map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
.collect())
从 Spark DataFrame 开始创建向量矩阵以进行进一步的分析处理。
feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()
输出是一个向量数组。其中一些向量中有一个 null
>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])
据此我想遍历向量矩阵并创建一个 LabeledPoint 数组,如果向量包含空值,则为 0(零),否则为 1。
def f(row):
if row.contain(None):
LabeledPoint(1.0,row)
else:
LabeledPoint(0.0,row)
我尝试使用
遍历向量矩阵feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol
但这不起作用。
TypeError: 'PipelinedRDD' object is not iterable
任何帮助都会很棒
RDDs
不是 Python 列表的替代品。您必须使用给定 RDD
上可用的操作或转换。在这里你可以简单地使用 map
:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint
feature_matrix_vectors = sc.parallelize([
DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])
(feature_matrix_vectors
.map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
.collect())