Spark:如何将 RDD 转换为 Seq 以在管道中使用
Spark: How to transform a RDD to Seq to be used in pipeline
我想使用 MLlib 中的管道实现。之前,我有一个 RDD 文件并将其传递给模型创建,但现在要使用管道,应该有 LabeledDocument 序列传递给管道。
我有如下创建的 RDD:
val data = sc.textFile("/test.csv");
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail))
}.cache()
在管道示例Spark Programming Guide中,管道需要以下数据:
// Prepare training documents, which are labeled.
val training = sparkContext.parallelize(Seq(
LabeledDocument(0L, "a b c d e spark", 1.0),
LabeledDocument(1L, "b d", 0.0),
LabeledDocument(2L, "spark f g h", 1.0),
LabeledDocument(3L, "hadoop mapreduce", 0.0),
LabeledDocument(4L, "b spark who", 1.0),
LabeledDocument(5L, "g d a y", 0.0),
LabeledDocument(6L, "spark fly", 1.0),
LabeledDocument(7L, "was mapreduce", 0.0),
LabeledDocument(8L, "e spark program", 1.0),
LabeledDocument(9L, "a e c l", 0.0),
LabeledDocument(10L, "spark compile", 1.0),
LabeledDocument(11L, "hadoop software", 0.0)))
我需要一种方法将我的 RDD (parsedData) 更改为 LabeledDocuments 序列(如示例中的训练)。
感谢您的帮助。
我找到了这个问题的答案。
我可以通过以下代码将我的 RDD (parsedData) 转换为 SchemaRDD,它是 LabeledDocuments 的序列:
val rddSchema = parsedData.toSchemaRDD;
现在问题改了!我想将新的 rddSchema 拆分为训练 (80%) 和测试 (20%)。如果我使用 randomSplit,它 returns 一个 Array[RDD[Row]] 而不是 SchemaRDD.
新问题:如何将Array[RDD[Row]]转换为SchemaRDD——或者——如何分割SchemaRDD,其中结果为SchemaRDDs?
我尝试在 pyspark 中关注-
def myFunc(s):
# words = s.split(",")
s = re.sub("\"", "", s)
words = [s for s in s.split(",")]
val = words[0]
lbl = 0.0
if val == 4 or val == "4":
lbl = 0.0
elif val == 0 or val == "0":
lbl = 1.0
cleanlbl = cleanLine(words[5], True, val)
# print "cleanlblcleanlbl ",cleanlbl
return LabeledPoint(lbl, htf.transform(cleanlbl.split(" ")))
sparseList = sc.textFile("hdfs:///stats/training.1600000.processed.noemoticon.csv").map(myFunc)
sparseList.cache() # Cache data since Logistic Regression is an iterative algorithm.
# for data in dataset:
trainfeats, testfeats = sparseList.randomSplit([0.8, 0.2], 10)
你可以边解析边拆分,你可以根据自己的需要进行修改
我想使用 MLlib 中的管道实现。之前,我有一个 RDD 文件并将其传递给模型创建,但现在要使用管道,应该有 LabeledDocument 序列传递给管道。
我有如下创建的 RDD:
val data = sc.textFile("/test.csv");
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail))
}.cache()
在管道示例Spark Programming Guide中,管道需要以下数据:
// Prepare training documents, which are labeled.
val training = sparkContext.parallelize(Seq(
LabeledDocument(0L, "a b c d e spark", 1.0),
LabeledDocument(1L, "b d", 0.0),
LabeledDocument(2L, "spark f g h", 1.0),
LabeledDocument(3L, "hadoop mapreduce", 0.0),
LabeledDocument(4L, "b spark who", 1.0),
LabeledDocument(5L, "g d a y", 0.0),
LabeledDocument(6L, "spark fly", 1.0),
LabeledDocument(7L, "was mapreduce", 0.0),
LabeledDocument(8L, "e spark program", 1.0),
LabeledDocument(9L, "a e c l", 0.0),
LabeledDocument(10L, "spark compile", 1.0),
LabeledDocument(11L, "hadoop software", 0.0)))
我需要一种方法将我的 RDD (parsedData) 更改为 LabeledDocuments 序列(如示例中的训练)。
感谢您的帮助。
我找到了这个问题的答案。
我可以通过以下代码将我的 RDD (parsedData) 转换为 SchemaRDD,它是 LabeledDocuments 的序列:
val rddSchema = parsedData.toSchemaRDD;
现在问题改了!我想将新的 rddSchema 拆分为训练 (80%) 和测试 (20%)。如果我使用 randomSplit,它 returns 一个 Array[RDD[Row]] 而不是 SchemaRDD.
新问题:如何将Array[RDD[Row]]转换为SchemaRDD——或者——如何分割SchemaRDD,其中结果为SchemaRDDs?
我尝试在 pyspark 中关注-
def myFunc(s):
# words = s.split(",")
s = re.sub("\"", "", s)
words = [s for s in s.split(",")]
val = words[0]
lbl = 0.0
if val == 4 or val == "4":
lbl = 0.0
elif val == 0 or val == "0":
lbl = 1.0
cleanlbl = cleanLine(words[5], True, val)
# print "cleanlblcleanlbl ",cleanlbl
return LabeledPoint(lbl, htf.transform(cleanlbl.split(" ")))
sparseList = sc.textFile("hdfs:///stats/training.1600000.processed.noemoticon.csv").map(myFunc)
sparseList.cache() # Cache data since Logistic Regression is an iterative algorithm.
# for data in dataset:
trainfeats, testfeats = sparseList.randomSplit([0.8, 0.2], 10)
你可以边解析边拆分,你可以根据自己的需要进行修改