向数据框添加新列时出现问题 - spark/scala

Problems with adding a new column to a dataframe - spark/scala

我是 spark/scala 的新手。我正在尝试从配置单元 table 读取一些数据到 spark 数据帧,然后根据某些条件添加一列。这是我的代码:

val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")

def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
      ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
    }

val finalDF = DF.withColumn("status", 
                   when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
                  .otherwise("null"))

dateDiff 是计算 partition_dateitem_due_date 之间差异的函数,它们是 DF 中的列。我正在尝试使用 whenotherwiseDF 添加一个新列,后者使用 dateDiff 来获取日期之间的差异。

现在,当我 运行 上面的代码时,我得到以下错误: org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0

我认为 partition_date 列的值没有被转换为要解析为日期的字符串。这是怎么回事?如果是,如何将列值转换为字符串?

下面是我在 DF 中使用的列的架构:

 |-- item_due_date: string (nullable = true)
 |-- past_due: integer (nullable = true)
 |-- item_decision: string (nullable = true)
 |-- partition_date: string (nullable = true)

我正在使用的来自 DF 的列的数据样本:

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|       1|   0001-01-14|         null|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       0|   2018-03-18|         null|    2017-11-22|
|       1|   2016-11-30|         null|    2017-11-22|
+--------+-------------+-------------+--------------+

我也试过使用自定义 UDF:

  def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
      if (past_due == 1 && item_due_date != "NULL") {
        if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
          if (item_decision != "NULL") "pending"
          else "approved"
        } else "expired"
      } else "NULL"
    }

val statusUDF = sqlContext.udf.register("statusUDF", status _)

val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()

它每次都会在 DF2.show 语句中抛出以下错误:

Container exited with a non-zero exit code 50

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute(DataFrame.scala:1498)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
        at org.apache.spark.sql.DataFrame$$anonfun$head.apply(DataFrame.scala:1375)
        at org.apache.spark.sql.DataFrame$$anonfun$head.apply(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
        at driver$.main(driver.scala:109)
        at driver.main(driver.scala)

如有任何帮助,我们将不胜感激。谢谢!

您可以简单地使用 datediff 内置函数来检查两列之间的天数差异。您不需要编写函数或 udf 函数。当函数也比你的修改时

import org.apache.spark.sql.functions._
val finalDF = DF.withColumn("status",
  when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && col("item_decision").isNotNull && !(lower(col("item_decision")).equalTo("null")), "approved")
    .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").isNull || lower(col("item_decision")).equalTo("null")), "pending")
      .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) >= 0), "expired")
    .otherwise("null"))))

此逻辑将转换 dataframe

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|1       |2017-12-14   |null         |2017-11-22    |
|1       |2017-12-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|0       |2018-03-18   |null         |2017-11-22    |
|1       |2016-11-30   |null         |2017-11-22    |
+--------+-------------+-------------+--------------+

加上 status 列作为

+--------+-------------+-------------+--------------+--------+
|past_due|item_due_date|item_decision|partition_date|status  |
+--------+-------------+-------------+--------------+--------+
|1       |2017-12-14   |null         |2017-11-22    |pending |
|1       |2017-12-14   |Mitigate     |2017-11-22    |approved|
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|0       |2018-03-18   |null         |2017-11-22    |null    |
|1       |2016-11-30   |null         |2017-11-22    |expired |
+--------+-------------+-------------+--------------+--------+

希望回答对你有帮助