在 Scala 数据框中查找两个非空记录之间的空记录数

Find number of null records between two non-null records in scala dataframe

我有一个数据框,如下所示。

|  ID | date       | sig01_diff |
+-----+------------+------------+
| 123 | 2019-11-04 | 93668      |
| 123 | 2019-11-05 | 49350      |
| 123 | 2019-11-07 | null       |
| 123 | 2019-11-08 | 11069      |
| 123 | 2019-11-09 | 33203      |
| 123 | 2019-11-11 | 47927      |
| 123 | 2020-01-21 | null       |
| 123 | 2020-01-22 | null       |
| 123 | 2020-01-23 | 33908      |
| 123 | 2020-01-24 | 61603      |
| 123 | 2020-01-27 | 33613      |
| 123 | 2020-01-28 | 27514      |
| 123 | 2020-01-29 | null       |
| 123 | 2020-01-30 | null       |
| 123 | 2020-02-11 | null       |
| 123 | 2020-02-12 | null       |
| 123 | 2020-02-13 | null       |
| 123 | 2020-02-14 | null       |
| 123 | 2020-02-15 | 65625      |
| 123 | 2020-02-17 | 13354      |
| 123 | 2020-02-18 | null       |
| 123 | 2020-02-19 | 69069      |
+-----+------------+------------+

我必须获取记录之前的空记录数,如下所示。

|  ID | date       | sig01_diff |null_count |
+-----+------------+------------+-----------+
| 123 | 2019-11-04 | 93668      | 00        |
| 123 | 2019-11-05 | 49350      | 00        |
| 123 | 2019-11-07 | null       | 00        |
| 123 | 2019-11-08 | 11069      | 01        |
| 123 | 2019-11-09 | 33203      | 00        |
| 123 | 2019-11-11 | 47927      | 00        |
| 123 | 2020-01-21 | null       | 00        |
| 123 | 2020-01-22 | null       | 00        |
| 123 | 2020-01-23 | 33908      | 02        |
| 123 | 2020-01-24 | 61603      | 00        |
| 123 | 2020-01-27 | 33613      | 00        |
| 123 | 2020-01-28 | 27514      | 00        |
| 123 | 2020-01-29 | null       | 00        |
| 123 | 2020-01-30 | null       | 00        |
| 123 | 2020-02-11 | null       | 00        |
| 123 | 2020-02-12 | null       | 00        |
| 123 | 2020-02-13 | null       | 00        |
| 123 | 2020-02-14 | null       | 00        |
| 123 | 2020-02-15 | 65625      | 06        |
| 123 | 2020-02-17 | 13354      | 00        |
| 123 | 2020-02-18 | null       | 00        |
| 123 | 2020-02-19 | 69069      | 01        |
+-----+------------+------------+-----------+

如上所示,新列将在该记录之前包含一些空记录。 例如以下日期:

2019-11-08
2020-02-15

使用 window 函数和 unboundpreceding,我能够在 window 中逐步找到空记录的计数。但我的要求是 window 两个非空记录之间的空记录数。

我怎样才能做到这一点?任何潜在客户表示赞赏!

您应该使用 window 函数,其中 rowsBetween 定义为 Window.currentRow - 1 作为上限。这是一个例子:

import org.apache.spark.sql.functions.{count, when, col}
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = List((1, Some(1)), (2, None), (3, Some(3)), (4, None), (5, Some(4))).toDF("id", "nullable_value")

val w = Window
  .orderBy(col("id"))
  .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)

df.withColumn("not_null_count", count(when(col("nullable_value").isNull, 1)).over(w)).show()

+---+--------------+--------------+
| id|nullable_value|not_null_count|
+---+--------------+--------------+
|  1|             1|             0|
|  2|          null|             0|
|  3|             3|             1|
|  4|          null|             1|
|  5|             4|             2|
+---+--------------+--------------+

似乎没有直接的函数可以找出两个 non-null 记录之间的空记录数。

下面提供了这个问题的有效解决方案。

%scala
import spark.implicits._
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window

val dfTest = Seq(
    (1,1,10,1),
    (1,2,10,2),
    (1,3,0,1),
    (1,4,20,1),
    (1,5,0,1),
    (1,6,0,1),
    (1,7,60,0),
    (1,8,0,2),
    (1,9,0,1),
    (1,10,0,1),
    (1,11,80,1),
    (1,7,60,1),
    (2,1,10,1),
    (2,2,10,2),
    (2,3,0,1),
    (2,4,20,0),
    (2,5,0,0),
    (2,6,0,1),
    (2,7,60,1)
  ).toDF("ID","date","A","B")

val test = dfTest.withColumn("A", when((col("A") === 0),null).otherwise(col("A")))
  .withColumn("B", when((col("B") === 0),null).otherwise(col("B")))

输入数据框如下所示。

+---+----+----+----+
| ID|date|   A|   B|
+---+----+----+----+
|  1|   1|  10|   1|
|  1|   2|  10|   2|
|  1|   3|null|   1|
|  1|   4|  20|   1|
|  1|   5|null|   1|
|  1|   6|null|   1|
|  1|   7|  60|null|
|  1|   8|null|   2|
|  1|   9|null|   1|
|  1|  10|null|   1|
|  1|  11|  80|   1|
|  1|   7|  60|   1|
|  2|   1|  10|   1|
|  2|   2|  10|   2|
|  2|   3|null|   1|
|  2|   4|  20|null|
|  2|   5|null|null|
|  2|   6|null|   1|
|  2|   7|  60|   1|
+---+----+----+----+

解决方案如下。

val w2 = Window.partitionBy("ID").orderBy("date")
val w3 = Window.partitionBy("ID").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow-1)


val newDf = test
  .withColumn("A_cnt", count(when(col("A").isNull, 1)).over(w3))
  .withColumn("B_cnt", count(when(col("B").isNull, 1)).over(w3))
  .withColumn("A_null", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_cnt")).otherwise(null))
  .withColumn("B_null", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_cnt")).otherwise(null))
  .withColumn("A_null_cnt", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_null") - last("A_null", true).over(w3)).otherwise(null))
  .withColumn("B_null_cnt", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_null") - last("B_null", true).over(w3)).otherwise(null))
  .drop("A_cnt")
  .drop("B_cnt")
  .drop("A_null")
  .drop("B_null")

输出如下所示。

+---+----+----+----+------------+------------+
| ID|date|   A|   B|  A_null_cnt|  B_null_cnt|
+---+----+----+----+------------+------------+
|  1|   1|  10|   1|        null|        null|
|  1|   2|  10|   2|        null|        null|
|  1|   3|null|   1|        null|        null|
|  1|   4|  20|   1|           1|        null|
|  1|   5|null|   1|        null|        null|
|  1|   6|null|   1|        null|        null|
|  1|   7|  60|null|           2|        null|
|  1|   7|  60|   1|        null|           1|
|  1|   8|null|   2|        null|        null|
|  1|   9|null|   1|        null|        null|
|  1|  10|null|   1|        null|        null|
|  1|  11|  80|   1|           3|        null|
|  2|   1|  10|   1|        null|        null|
|  2|   2|  10|   2|        null|        null|
|  2|   3|null|   1|        null|        null|
|  2|   4|  20|null|           1|        null|
|  2|   5|null|null|        null|        null|
|  2|   6|null|   1|        null|           2|
|  2|   7|  60|   1|           2|        null|
+---+----+----+----+------------+------------+