scala spark dataframe 使用 udf return 值修改列

scala spark dataframe modify column with udf return value

我有一个带有时间戳字段的 spark 数据框,我想将其转换为长数据类型。我使用了 UDF 并且独立代码工作正常但是当我插入到需要转换任何时间戳的通用逻辑时我无法得到它 working.Issue 我如何评估 return 值从 UDF 回到数据框列

下面是代码片段

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test3").getOrCreate();
      import org.apache.spark.sql.functions._
      val sqlContext  = spark.sqlContext
      val df2 = sqlContext.jsonRDD(spark.sparkContext.parallelize(Array(
        """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
        """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
      )))

      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

        df2.withColumn("manufacture_ts",getTime(df2("manufacture_ts"))).show

       +-----+----------+-----+--------------+-----+----+
        |     |No Comment|Tesla| 1508126400000|    S|2012|
        |     |   Get one| Ford| 1508126400000| E350|1997|
        |     |          |Chevy| 1508126400000| Volt|2015|
        +-----+----------+-----+--------------+-----+----+ 

    Now i want to invoke this from a dataframe to be clled on all columns which are of type long

    object Test4 extends App{

        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate();
        import spark.implicits._

        import scala.collection.JavaConversions._    
        val long : Long  = "1508299200000".toLong    

        val data = Seq(Row("10000020_LUX_OTC",long,"2020-02-14"))

        val schema = List( StructField("rowkey",StringType,true)
                                  ,StructField("order_receipt_dt",LongType,true)
                                  ,StructField("maturity_dt",StringType,true))

        val dataDF =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schema))

        val modifedDf2= schema.foldLeft(dataDF) { case (newDF,StructField(name,dataType,flag,metadata)) =>
          newDF.withColumn(name,DataTypeUtil.transformLong(newDF,name,dataType.typeName))
modifedDf2,show
        }

      }


      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

      def transformLong(dataFrame: DataFrame,name:String, fieldType:String):Column = {
        import org.apache.spark.sql.functions._

        fieldType.toLowerCase match {

          case "timestamp"  => convertTimeStamp(dataFrame(name))
          case _ => dataFrame.col(name)
        }
      }

如果时间戳为空,可能你的 udf 崩溃了你可以这样做:

  • 使用 unix_timestamp 而不是 UDF.. 或使您的 UDF 为 null-safe
  • 只适用于需要转换的字段。

给定数据:

进口spark.implicits._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

val df = Seq(
  (1L,Timestamp.valueOf(LocalDateTime.now()),Timestamp.valueOf(LocalDateTime.now()))
).toDF("id","ts1","ts2")

你可以做到:

val newDF = df.schema.fields.filter(_.dataType == TimestampType).map(_.name)
  .foldLeft(df)((df,field) => df.withColumn(field,unix_timestamp(col(field))))

newDF.show()

给出:

+---+----------+----------+
| id|       ts1|       ts2|
+---+----------+----------+
|  1|1589109282|1589109282|
+---+----------+----------+