比较 Scala Spark Dataframe 中的 2 个数组

Compare 2 arrays in Scala Spark Dataframe

我有一个包含 2 列 Array[String] 的数据框,如下所示:

+-------------------+--------------------+--------------------+
|        HEURE_USAGE|        LISTE_CODE_1|        LISTE_CODE_2|
+-------------------+--------------------+--------------------+
|2019-09-06 11:34:57|[GBF401, GO0421, ...|[GB9P01, GO2621, ...|
|2019-09-02 13:27:49|[GO1180, BTMF01, ...|[GO3180, OLMP01, ...|
|2019-09-02 13:17:53|[GO1180, BTMF01, ...|[GO1180, BTMF01, ...|
|2019-09-06 11:27:05|[GBF401, GO0421, ...|[GBX401, GO0721, ...|
+-------------------+--------------------+--------------------+

我正在尝试创建一个列 'LISTE_CODE_3',它是每一行的列 'LISTE_CODE_1' 和列 'LISTE_CODE_2' 的交集。

Spark 2.4 中有一个完美的函数可以做到这一点。

returns不重复的交集是交集函数。 很遗憾,Spark 2.2 中不存在此功能。

我想也许我们应该比较一下。

你有什么想法吗?

您可以使用用户定义的函数:

spark.udf.register("intersect_arrays", (a: Seq[String], b: Seq[String]) => a intersect b)
spark.sql("select *, intersect_arrays(LISTE_CODE_1, LISTE_CODE_2) as LISTE_CODE_3 from ds")

或者在纯 Spark SQL 中执行(这里假设 HEURE_USAGE 在整个数据集中是唯一的):

spark.sql("""
   select ds.HEURE_USAGE, LISTE_CODE_1, LISTE_CODE_2, coalesce(inter, array()) as LISTE_CODE_3
   from ds left join (
     select HEURE_USAGE, collect_list(CODE_1) as inter from (
       select * from (
         select HEURE_USAGE, CODE_1, explode(LISTE_CODE_2) as CODE_2
         from (select HEURE_USAGE, explode(LISTE_CODE_1) as CODE_1, LISTE_CODE_2 as LISTE_CODE_2 from ds)
       ) where CODE_1 = CODE_2
   ) group by HEURE_USAGE) t
   on t.HEURE_USAGE = ds.HEURE_USAGE""")

想法是分解LISTE_CODE_1LISTE_CODE_2,只保留匹配CODE_1CODE_2的行,收集CODEs 到一个新数组中,并与原始数据框连接以保留所有原始行(即使是交集为空的行)。