Spark 将数据帧映射到数组
Spark map dataframe to array
我正在使用 Spark MLlib PrefixSpan 算法。我有一些代码在 Spark 1.6 中工作,但我们最近转移到了 Spark 2.2。
我有一个这样的数据框
viewsPurchasesGrouped: org.apache.spark.sql.DataFrame = [session_id: decimal(29,0), view_product_ids: array<bigint> ... 1 more field]
root
|-- session_id: decimal(29,0) (nullable = true)
|-- view_product_ids: array (nullable = true)
| |-- element: long (containsNull = true)
|-- purchase_product_ids: array (nullable = true)
| |-- element: long (containsNull = true)
在 Spark 1.6 中,我使用这段代码将其转换为适合 MLlib 使用的数据帧:
import scala.collection.mutable.WrappedArray
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
Array(
Array(row.getAs[WrappedArray[String]](1).toArray),
Array(row.getAs[WrappedArray[String]](2).toArray)
)
)
自从我们切换后,这不再有效。
我试过这个:
val viewsPurchasesRddString2 = viewsPurchasesGrouped.select("view_product_ids","purchase_product_ids").rdd.map( row =>
Array(
row.getSeq[Long](0).toArray,
row.getSeq[Long](1).toArray
)
)
并看到这个令人费解的错误消息,这意味着它从原始数据帧中取出 session_id 和 purchase_product_ids 而不是 view_product_ids 和 purchase_product_ids。
Job aborted due to stage failure: [...] scala.MatchError: [14545234113341303814564569524,WrappedArray(123, 234, 456, 678, 789)]
我也试过这个:
val viewsPurchasesRddString = viewsPurchasesGrouped.map {
case Row(session_id: Long, view_product_ids: Array[Long], purchase_product_ids: Array[Long]) =>
(view_product_ids, purchase_product_ids)
}
失败
viewsPurchasesRddString: org.apache.spark.sql.Dataset[(Array[Long], Array[Long])] = [_1: array<bigint>, _2: array<bigint>]
prefixSpan: org.apache.spark.mllib.fpm.PrefixSpan = org.apache.spark.mllib.fpm.PrefixSpan@10d69876
<console>:67: error: overloaded method value run with alternatives:
[Item, Itemset <: Iterable[Item], Sequence <: Iterable[Itemset]](data: org.apache.spark.api.java.JavaRDD[Sequence])org.apache.spark.mllib.fpm.PrefixSpanModel[Item] <and>
[Item](data: org.apache.spark.rdd.RDD[Array[Array[Item]]])(implicit evidence:
scala.reflect.ClassTag[Item])org.apache.spark.mllib.fpm.PrefixSpanModel[Item] cannot be applied to (org.apache.spark.sql.Dataset[(Array[Long], Array[Long])])
val model = prefixSpan.run(viewsPurchasesRddString)
^
如何正确移植我的代码?
您的数据框表明列的类型为 array<string>
,因此您不应使用 Seq[Long]
访问这些列。在 spark 1.6 中,dataframe 上的 map
自动切换为 RDD API,在 Spark 2 中你需要使用 rdd.map
来做同样的事情。所以我建议这应该有效:
val viewsPurchasesRddString = viewsPurchasesGrouped.rdd.map( row =>
Array(
Array(row.getAs[WrappedArray[String]](1).toArray),
Array(row.getAs[WrappedArray[String]](2).toArray)
)
)
我正在使用 Spark MLlib PrefixSpan 算法。我有一些代码在 Spark 1.6 中工作,但我们最近转移到了 Spark 2.2。
我有一个这样的数据框
viewsPurchasesGrouped: org.apache.spark.sql.DataFrame = [session_id: decimal(29,0), view_product_ids: array<bigint> ... 1 more field]
root
|-- session_id: decimal(29,0) (nullable = true)
|-- view_product_ids: array (nullable = true)
| |-- element: long (containsNull = true)
|-- purchase_product_ids: array (nullable = true)
| |-- element: long (containsNull = true)
在 Spark 1.6 中,我使用这段代码将其转换为适合 MLlib 使用的数据帧:
import scala.collection.mutable.WrappedArray
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
Array(
Array(row.getAs[WrappedArray[String]](1).toArray),
Array(row.getAs[WrappedArray[String]](2).toArray)
)
)
自从我们切换后,这不再有效。
我试过这个:
val viewsPurchasesRddString2 = viewsPurchasesGrouped.select("view_product_ids","purchase_product_ids").rdd.map( row =>
Array(
row.getSeq[Long](0).toArray,
row.getSeq[Long](1).toArray
)
)
并看到这个令人费解的错误消息,这意味着它从原始数据帧中取出 session_id 和 purchase_product_ids 而不是 view_product_ids 和 purchase_product_ids。
Job aborted due to stage failure: [...] scala.MatchError: [14545234113341303814564569524,WrappedArray(123, 234, 456, 678, 789)]
我也试过这个:
val viewsPurchasesRddString = viewsPurchasesGrouped.map {
case Row(session_id: Long, view_product_ids: Array[Long], purchase_product_ids: Array[Long]) =>
(view_product_ids, purchase_product_ids)
}
失败
viewsPurchasesRddString: org.apache.spark.sql.Dataset[(Array[Long], Array[Long])] = [_1: array<bigint>, _2: array<bigint>]
prefixSpan: org.apache.spark.mllib.fpm.PrefixSpan = org.apache.spark.mllib.fpm.PrefixSpan@10d69876
<console>:67: error: overloaded method value run with alternatives:
[Item, Itemset <: Iterable[Item], Sequence <: Iterable[Itemset]](data: org.apache.spark.api.java.JavaRDD[Sequence])org.apache.spark.mllib.fpm.PrefixSpanModel[Item] <and>
[Item](data: org.apache.spark.rdd.RDD[Array[Array[Item]]])(implicit evidence:
scala.reflect.ClassTag[Item])org.apache.spark.mllib.fpm.PrefixSpanModel[Item] cannot be applied to (org.apache.spark.sql.Dataset[(Array[Long], Array[Long])])
val model = prefixSpan.run(viewsPurchasesRddString)
^
如何正确移植我的代码?
您的数据框表明列的类型为 array<string>
,因此您不应使用 Seq[Long]
访问这些列。在 spark 1.6 中,dataframe 上的 map
自动切换为 RDD API,在 Spark 2 中你需要使用 rdd.map
来做同样的事情。所以我建议这应该有效:
val viewsPurchasesRddString = viewsPurchasesGrouped.rdd.map( row =>
Array(
Array(row.getAs[WrappedArray[String]](1).toArray),
Array(row.getAs[WrappedArray[String]](2).toArray)
)
)