获取组中的第一个非空值(Spark 1.6)

Get first non-null values in group by (Spark 1.6)

如何从分组依据中获取第一个非空值?我尝试使用 first with coalesce F.first(F.coalesce("code")) 但我没有得到所需的行为(我似乎得到了第一行)。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

我试过了:

(df
  .groupby("id")
  .agg(F.first(F.coalesce("code")),
       F.first(F.coalesce("name")))
  .collect())

期望的输出

[Row(id='a', code='code1', name='name2')]

对于 Spark 1.3 - 1.5,这可以解决问题:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
|  a|      code1|      name2|
+---+-----------+-----------+

编辑

显然,在 1.6 版中,他们改变了 first 聚合函数的处理方式。现在,底层 class First should be constructed with a second argument ignoreNullsExpr parameter, which is not yet used by the first aggregate function (as can bee seen here). However, in Spark 2.0 it will be able to call agg(F.first(col, True)) to ignore nulls (as can be checked here).

因此,对于 Spark 1.6,该方法必须有所不同,并且效率低一些,不幸的是。一种想法如下:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
|  a|        code1|        name2|
+---+-------------+-------------+

也许有更好的选择。如果我找到答案,我会编辑答案。

因为每个分组我只有一个非空值,所以在 1.6 中使用 min / max 可以满足我的目的:

(df
  .groupby("id")
  .agg(F.min("code"),
       F.min("name"))
  .show())

+---+---------+---------+
| id|min(code)|min(name)|
+---+---------+---------+
|  a|    code1|    name2|
+---+---------+---------+