RDD 到 LabeledPoint 的转换
RDD to LabeledPoint conversion
如果我有一个包含大约 500 列和 2 亿行的 RDD,并且 RDD.columns.indexOf("target", 0)
显示 Int = 77
这告诉我我的目标因变量位于第 77 列。但我没有关于如何 select 所需(部分)列作为特征的足够知识(假设我想要从 23 到 59、111 到 357、399 到 489 的列)。我想知道我是否可以申请:
val data = rdd.map(col => new LabeledPoint(
col(77).toDouble, Vectors.dense(??.map(x => x.toDouble).toArray))
如有任何建议或指导,我们将不胜感激。
也许我把 RDD 和 DataFrame 弄混了,我可以用 .toDF()
将 RDD 转换成 DataFrame,或者用 DataFrame 比 RDD 更容易实现目标。
我假设您的数据大致如下所示:
import scala.util.Random.{setSeed, nextDouble}
setSeed(1)
case class Record(
foo: Double, target: Double, x1: Double, x2: Double, x3: Double)
val rows = sc.parallelize(
(1 to 10).map(_ => Record(
nextDouble, nextDouble, nextDouble, nextDouble, nextDouble
))
)
val df = sqlContext.createDataFrame(rows)
df.registerTempTable("df")
sqlContext.sql("""
SELECT ROUND(foo, 2) foo,
ROUND(target, 2) target,
ROUND(x1, 2) x1,
ROUND(x2, 2) x2,
ROUND(x2, 2) x3
FROM df""").show
所以我们有如下数据:
+----+------+----+----+----+
| foo|target| x1| x2| x3|
+----+------+----+----+----+
|0.73| 0.41|0.21|0.33|0.33|
|0.01| 0.96|0.94|0.95|0.95|
| 0.4| 0.35|0.29|0.51|0.51|
|0.77| 0.66|0.16|0.38|0.38|
|0.69| 0.81|0.01|0.52|0.52|
|0.14| 0.48|0.54|0.58|0.58|
|0.62| 0.18|0.01|0.16|0.16|
|0.54| 0.97|0.25|0.39|0.39|
|0.43| 0.23|0.89|0.04|0.04|
|0.66| 0.12|0.65|0.98|0.98|
+----+------+----+----+----+
我们想忽略 foo
和 x2
并提取 LabeledPoint(target, Array(x1, x3))
:
// Map feature names to indices
val featInd = List("x1", "x3").map(df.columns.indexOf(_))
// Or if you want to exclude columns
val ignored = List("foo", "target", "x2")
val featInd = df.columns.diff(ignored).map(df.columns.indexOf(_))
// Get index of target
val targetInd = df.columns.indexOf("target")
df.rdd.map(r => LabeledPoint(
r.getDouble(targetInd), // Get target value
// Map feature indices to values
Vectors.dense(featInd.map(r.getDouble(_)).toArray)
))
如果我有一个包含大约 500 列和 2 亿行的 RDD,并且 RDD.columns.indexOf("target", 0)
显示 Int = 77
这告诉我我的目标因变量位于第 77 列。但我没有关于如何 select 所需(部分)列作为特征的足够知识(假设我想要从 23 到 59、111 到 357、399 到 489 的列)。我想知道我是否可以申请:
val data = rdd.map(col => new LabeledPoint(
col(77).toDouble, Vectors.dense(??.map(x => x.toDouble).toArray))
如有任何建议或指导,我们将不胜感激。
也许我把 RDD 和 DataFrame 弄混了,我可以用 .toDF()
将 RDD 转换成 DataFrame,或者用 DataFrame 比 RDD 更容易实现目标。
我假设您的数据大致如下所示:
import scala.util.Random.{setSeed, nextDouble}
setSeed(1)
case class Record(
foo: Double, target: Double, x1: Double, x2: Double, x3: Double)
val rows = sc.parallelize(
(1 to 10).map(_ => Record(
nextDouble, nextDouble, nextDouble, nextDouble, nextDouble
))
)
val df = sqlContext.createDataFrame(rows)
df.registerTempTable("df")
sqlContext.sql("""
SELECT ROUND(foo, 2) foo,
ROUND(target, 2) target,
ROUND(x1, 2) x1,
ROUND(x2, 2) x2,
ROUND(x2, 2) x3
FROM df""").show
所以我们有如下数据:
+----+------+----+----+----+
| foo|target| x1| x2| x3|
+----+------+----+----+----+
|0.73| 0.41|0.21|0.33|0.33|
|0.01| 0.96|0.94|0.95|0.95|
| 0.4| 0.35|0.29|0.51|0.51|
|0.77| 0.66|0.16|0.38|0.38|
|0.69| 0.81|0.01|0.52|0.52|
|0.14| 0.48|0.54|0.58|0.58|
|0.62| 0.18|0.01|0.16|0.16|
|0.54| 0.97|0.25|0.39|0.39|
|0.43| 0.23|0.89|0.04|0.04|
|0.66| 0.12|0.65|0.98|0.98|
+----+------+----+----+----+
我们想忽略 foo
和 x2
并提取 LabeledPoint(target, Array(x1, x3))
:
// Map feature names to indices
val featInd = List("x1", "x3").map(df.columns.indexOf(_))
// Or if you want to exclude columns
val ignored = List("foo", "target", "x2")
val featInd = df.columns.diff(ignored).map(df.columns.indexOf(_))
// Get index of target
val targetInd = df.columns.indexOf("target")
df.rdd.map(r => LabeledPoint(
r.getDouble(targetInd), // Get target value
// Map feature indices to values
Vectors.dense(featInd.map(r.getDouble(_)).toArray)
))