对数据框的操作
Operation on Data Frame
我使用 Spark 1.3 中的 DataFrame API。
我想在不丢失 DataFrame 的所有元素的情况下从 DataFrame 中的日期获取星期几。
在使用DataFrame之前,我曾经使用jodatime在简单的地图上获取它API。
目前有效的解决方案:
sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema))
是否可以在不返回 RDD[Row]
上的地图的情况下进行操作,然后使用此 RDD 创建 DataFrame?
您可以结合使用对 DataFrame
调用 select()
和用户定义的函数 (UDF) 来转换相关列。
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._
一例class建立实例DataFrame
.
private case class Cust(id: Integer, name: String,
sales: Double, discount: Double, state: String)
然后设置一个SQLContext
,创建DataFrame
如下:
import sqlContext.implicits._
val custs = Seq(
Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
val customerDF = sc.parallelize(custs, 4).toDF()
注册一个简单的 UDF,您将使用它来转换 "discount" 列。
val myFunc = udf {(x: Double) => x + 1}
获取列,已将 UDF 应用到 "discount" 列并将其他列保持原样。
val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")
我想找到一种 "better" 方法来匹配该列,但以下方法有效。
使用 as()
给列一个新名称,因为我们可以!
val mappedCols = cols.map(c =>
if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)
使用 select() 生成新的 DataFrame
val newDF = customerDF.select(mappedCols:_*)
你变了
id name sales discount state
1 Widget Co 120000.0 0.0 AZ
2 Acme Widgets 410500.0 500.0 CA
3 Widgetry 410500.0 200.0 CA
4 Widgets R Us 410500.0 0.0 CA
5 Ye Olde Widgete 500.0 0.0 MA
进入
id name sales transformed state
1 Widget Co 120000.0 1.0 AZ
2 Acme Widgets 410500.0 501.0 CA
3 Widgetry 410500.0 201.0 CA
4 Widgets R Us 410500.0 1.0 CA
5 Ye Olde Widgete 500.0 1.0 MA
您可以找到完整的示例 source code here。如果您对精确的列替换不挑剔,可以使其更简单。
试试这个
Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed"))
MyUdf是你自己定义的udf。
我使用 Spark 1.3 中的 DataFrame API。
我想在不丢失 DataFrame 的所有元素的情况下从 DataFrame 中的日期获取星期几。
在使用DataFrame之前,我曾经使用jodatime在简单的地图上获取它API。
目前有效的解决方案:
sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema))
是否可以在不返回 RDD[Row]
上的地图的情况下进行操作,然后使用此 RDD 创建 DataFrame?
您可以结合使用对 DataFrame
调用 select()
和用户定义的函数 (UDF) 来转换相关列。
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._
一例class建立实例DataFrame
.
private case class Cust(id: Integer, name: String,
sales: Double, discount: Double, state: String)
然后设置一个SQLContext
,创建DataFrame
如下:
import sqlContext.implicits._
val custs = Seq(
Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
val customerDF = sc.parallelize(custs, 4).toDF()
注册一个简单的 UDF,您将使用它来转换 "discount" 列。
val myFunc = udf {(x: Double) => x + 1}
获取列,已将 UDF 应用到 "discount" 列并将其他列保持原样。
val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")
我想找到一种 "better" 方法来匹配该列,但以下方法有效。
使用 as()
给列一个新名称,因为我们可以!
val mappedCols = cols.map(c =>
if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)
使用 select() 生成新的 DataFrame
val newDF = customerDF.select(mappedCols:_*)
你变了
id name sales discount state
1 Widget Co 120000.0 0.0 AZ
2 Acme Widgets 410500.0 500.0 CA
3 Widgetry 410500.0 200.0 CA
4 Widgets R Us 410500.0 0.0 CA
5 Ye Olde Widgete 500.0 0.0 MA
进入
id name sales transformed state
1 Widget Co 120000.0 1.0 AZ
2 Acme Widgets 410500.0 501.0 CA
3 Widgetry 410500.0 201.0 CA
4 Widgets R Us 410500.0 1.0 CA
5 Ye Olde Widgete 500.0 1.0 MA
您可以找到完整的示例 source code here。如果您对精确的列替换不挑剔,可以使其更简单。
试试这个
Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed"))
MyUdf是你自己定义的udf。