如何将 Spark 数据帧中的任意数量的列从时间戳转换为 Longs?
How can I convert an arbitrary number of columns in a Spark dataframe from Timestamps to Longs?
我正在用 Scala 编写此代码并使用 Spark 1.6,无法切换到较新的版本。我正在尝试合并两个数据帧,一个从 Hadoop 集群上的 Avro 文件中提取,一个从 Teradata 数据库中提取。我可以很好地阅读它们,并且保证它们以相同的顺序具有相同的列名,但是当我尝试使用
合并它们时
data1.unionAll(data2)
我遇到了一个错误,因为 Avro 将时间戳转换为长整数,所以这两个字段的数据类型不匹配。这个过程将重复几次,我知道表中总会有至少一个时间戳字段,但可能会有更多,我不会总是知道它们的名字,所以我试图制定一种通用方法将任意数量的列从时间戳转换为长整型。这是我目前所拥有的:
def transformTimestamps(df: DataFrame): DataFrame = {
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
df.dtypes.foreach { f =>
val fName = f._1
val fType = f._2
if (fType == "TimestampType:) {
println("Found timestamp col: " + fName)
df.withColumn(fName, convert_timestamp_udf(df.col(fName)))
df.printSchema()
}
}
return df
}
根据打印输出,我可以看出该方法仅正确识别时间戳列,但 .withColumn 转换不起作用。在下一行中打印架构不会显示更新的列。此外,我还尝试为转换后的值创建一个全新的列,但它也没有添加到 df 中。谁能看出这不起作用的原因?
下面一行就是 transformation
df.withColumn(fName, convert_timestamp_udf(df.col(fName)))
在执行 action
之前不会反映在原始 dataframe
上。分配将作为一个动作工作,因此您可以创建一个临时 dataframe
并在循环中分配给它作为
def transformTimestamps(df: DataFrame): DataFrame = {
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
var tempDF = df
df.schema.map(f => {
val fName = f.name
val fType = f.dataType
if (fType.toString == "TimestampType") {
println("Found timestamp col: " + fName)
tempDF = tempDF.withColumn(fName, convert_timestamp_udf(df.col(fName)))
tempDF.printSchema()
}
})
return tempDF
}
希望回答对你有帮助
val index = ss.sparkContext.parallelize( Seq((1,"2017-5-5"),
(2,"2017-5-5"),
(3,"2017-5-5"),
(4,"2017-5-5"),
(5,"2017-5-5"))).toDF("ID", "time")
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
val newDF = index.withColumn("time", convert_timestamp_udf($"time"))
newDF.show
避免使用可变 var
的一种方法是,您可以通过组合 TimestampType
的列列表并通过 foldLeft
遍历列表来执行类型转换UDF:
import java.sql.Timestamp
val df = Seq(
(1, Timestamp.valueOf("2016-05-01 11:30:00"), "a", Timestamp.valueOf("2017-06-01 07:00:30")),
(2, Timestamp.valueOf("2016-06-01 12:30:00"), "b", Timestamp.valueOf("2017-07-01 08:00:30")),
(3, Timestamp.valueOf("2016-07-01 13:30:00"), "c", Timestamp.valueOf("2017-08-01 09:00:30"))
).toDF("id", "date1", "status", "date2")
val convert_timestamp_udf = udf( (time: Timestamp) => time.getTime() )
// Assemble all columns filtered with type TimestampType
val tsColumns = df.dtypes.filter(x => x._2 == "TimestampType")
// Create new dataframe by converting all Timestamps to Longs via foldLeft
val dfNew = tsColumns.foldLeft( df )(
(acc, x) => acc.withColumn(x._1, convert_timestamp_udf(df(x._1)))
)
dfNew.show
+---+-------------+------+-------------+
| id| date1|status| date2|
+---+-------------+------+-------------+
| 1|1462127400000| a|1496325630000|
| 2|1464809400000| b|1498921230000|
| 3|1467405000000| c|1501603230000|
+---+-------------+------+-------------+
我正在用 Scala 编写此代码并使用 Spark 1.6,无法切换到较新的版本。我正在尝试合并两个数据帧,一个从 Hadoop 集群上的 Avro 文件中提取,一个从 Teradata 数据库中提取。我可以很好地阅读它们,并且保证它们以相同的顺序具有相同的列名,但是当我尝试使用
合并它们时 data1.unionAll(data2)
我遇到了一个错误,因为 Avro 将时间戳转换为长整数,所以这两个字段的数据类型不匹配。这个过程将重复几次,我知道表中总会有至少一个时间戳字段,但可能会有更多,我不会总是知道它们的名字,所以我试图制定一种通用方法将任意数量的列从时间戳转换为长整型。这是我目前所拥有的:
def transformTimestamps(df: DataFrame): DataFrame = {
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
df.dtypes.foreach { f =>
val fName = f._1
val fType = f._2
if (fType == "TimestampType:) {
println("Found timestamp col: " + fName)
df.withColumn(fName, convert_timestamp_udf(df.col(fName)))
df.printSchema()
}
}
return df
}
根据打印输出,我可以看出该方法仅正确识别时间戳列,但 .withColumn 转换不起作用。在下一行中打印架构不会显示更新的列。此外,我还尝试为转换后的值创建一个全新的列,但它也没有添加到 df 中。谁能看出这不起作用的原因?
下面一行就是 transformation
df.withColumn(fName, convert_timestamp_udf(df.col(fName)))
在执行 action
之前不会反映在原始 dataframe
上。分配将作为一个动作工作,因此您可以创建一个临时 dataframe
并在循环中分配给它作为
def transformTimestamps(df: DataFrame): DataFrame = {
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
var tempDF = df
df.schema.map(f => {
val fName = f.name
val fType = f.dataType
if (fType.toString == "TimestampType") {
println("Found timestamp col: " + fName)
tempDF = tempDF.withColumn(fName, convert_timestamp_udf(df.col(fName)))
tempDF.printSchema()
}
})
return tempDF
}
希望回答对你有帮助
val index = ss.sparkContext.parallelize( Seq((1,"2017-5-5"),
(2,"2017-5-5"),
(3,"2017-5-5"),
(4,"2017-5-5"),
(5,"2017-5-5"))).toDF("ID", "time")
val convert_timestamp_udf = udf((time:Timestamp) => time.getTime())
val newDF = index.withColumn("time", convert_timestamp_udf($"time"))
newDF.show
避免使用可变 var
的一种方法是,您可以通过组合 TimestampType
的列列表并通过 foldLeft
遍历列表来执行类型转换UDF:
import java.sql.Timestamp
val df = Seq(
(1, Timestamp.valueOf("2016-05-01 11:30:00"), "a", Timestamp.valueOf("2017-06-01 07:00:30")),
(2, Timestamp.valueOf("2016-06-01 12:30:00"), "b", Timestamp.valueOf("2017-07-01 08:00:30")),
(3, Timestamp.valueOf("2016-07-01 13:30:00"), "c", Timestamp.valueOf("2017-08-01 09:00:30"))
).toDF("id", "date1", "status", "date2")
val convert_timestamp_udf = udf( (time: Timestamp) => time.getTime() )
// Assemble all columns filtered with type TimestampType
val tsColumns = df.dtypes.filter(x => x._2 == "TimestampType")
// Create new dataframe by converting all Timestamps to Longs via foldLeft
val dfNew = tsColumns.foldLeft( df )(
(acc, x) => acc.withColumn(x._1, convert_timestamp_udf(df(x._1)))
)
dfNew.show
+---+-------------+------+-------------+
| id| date1|status| date2|
+---+-------------+------+-------------+
| 1|1462127400000| a|1496325630000|
| 2|1464809400000| b|1498921230000|
| 3|1467405000000| c|1501603230000|
+---+-------------+------+-------------+