在 spark scala 中使用 window 函数删除重复记录
Dropping duplicate records based using window function in spark scala
或者只是为了让这个简单的理解
我有一个数据框。
DataPartition TimeStamp OrganizationID SourceID AuditorID AuditorEnumerationId AuditorOpinionCode AuditorOpinionId IsPlayingAuditorRole IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode AuditorOpinionOnInternalControlsId AuditorOpinionOnGoingConcernId rank
Japan 2018-05-03T09:52:48+00:00 4295876589 194 null null null null null null null O|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 194 2719 3023331 AOP 3010542 true false true O|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 195 16157 1002485247 UWE 3010547 true false false O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 196 3252 3024053 ONC 3020538 true false true O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 195 5937 3026578 NOP 3010543 true false true O|!| null null null null 1
Japan 2018-05-02T10:37:50+00:00 4295876589 156 null null null null null null null O|!| null null null null 1
Japan 2018-05-02T10:37:50+00:00 4295876589 157 null null null null null null null O|!| null null null null 1
Japan 2018-05-02T10:37:56+00:00 4295876589 193 null null null null null null null O|!| null null null null 1
Japan 2018-05-03T08:10:19+00:00 4295876589 196 null null null null null null null D|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 195 null null null null null null null O|!| null null null null 1
现在我需要选择具有 Rank =1 和 AuditorID!=null 的列,但 AuditorID =!=null 将仅适用于 FFAction|!|="O".
在那种情况下,我的输出数据框应该如下所示
DataPartition TimeStamp OrganizationID SourceID AuditorID AuditorEnumerationId AuditorOpinionCode AuditorOpinionId IsPlayingAuditorRole IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode AuditorOpinionOnInternalControlsId AuditorOpinionOnGoingConcernId rank
Japan 2018-05-03T09:52:48+00:00 4295876589 194 2719 3023331 AOP 3010542 true false true O|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 195 16157 1002485247 UWE 3010547 true false false O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 196 3252 3024053 ONC 3020538 true false true O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 195 5937 3026578 NOP 3010543 true false true O|!| null null null null 1
Japan 2018-05-02T10:37:56+00:00 4295876589 193 null null null null null null null I|!| null null null null 1
Japan 2018-05-03T08:10:19+00:00 4295876589 196 null null null null null null null D|!| null null null null 1
这是我的代码
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1 && $"AuditorID" =!= "null")
场景 2 ...
这是我的数据框
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear TimeStamp
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 310 182 INC 500186 null null null null O|!| Japan 2018 2018-05-10T08:30:53+00:00
当我在答案中应用建议的代码时,我的输出低于
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1 && (($"UpdateReason_updateReasonId" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|")).drop("rank")
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
但我的预期输出是这样的。
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
您可以使用 rownum udf 删除重复项并检查 rownum =1 并且 authorid 不为空
这是适合您的工作代码
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1 && (($"AuditorID" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|"))
哪个应该给你
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|DataPartition|TimeStamp |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|rank|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|Japan |2018-05-03T09:52:48+00:00|4295876589 |194 |2719 |3023331 |AOP |3010542 |true |false |true |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T09:52:48+00:00|4295876589 |195 |16157 |1002485247 |UWE |3010547 |true |false |false |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T07:36:47+00:00|4295876589 |196 |3252 |3024053 |ONC |3020538 |true |false |true |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T07:36:47+00:00|4295876589 |195 |5937 |3026578 |NOP |3010543 |true |false |true |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T08:10:19+00:00|4295876589 |196 |null |null |null |null |null |null |null |D|!| |null |null |null |null |1 |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
注意:sourceID 为 193 的记录有 o|!|和 null 所以它不应该在输出中
或者只是为了让这个简单的理解 我有一个数据框。
DataPartition TimeStamp OrganizationID SourceID AuditorID AuditorEnumerationId AuditorOpinionCode AuditorOpinionId IsPlayingAuditorRole IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode AuditorOpinionOnInternalControlsId AuditorOpinionOnGoingConcernId rank
Japan 2018-05-03T09:52:48+00:00 4295876589 194 null null null null null null null O|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 194 2719 3023331 AOP 3010542 true false true O|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 195 16157 1002485247 UWE 3010547 true false false O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 196 3252 3024053 ONC 3020538 true false true O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 195 5937 3026578 NOP 3010543 true false true O|!| null null null null 1
Japan 2018-05-02T10:37:50+00:00 4295876589 156 null null null null null null null O|!| null null null null 1
Japan 2018-05-02T10:37:50+00:00 4295876589 157 null null null null null null null O|!| null null null null 1
Japan 2018-05-02T10:37:56+00:00 4295876589 193 null null null null null null null O|!| null null null null 1
Japan 2018-05-03T08:10:19+00:00 4295876589 196 null null null null null null null D|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 195 null null null null null null null O|!| null null null null 1
现在我需要选择具有 Rank =1 和 AuditorID!=null 的列,但 AuditorID =!=null 将仅适用于 FFAction|!|="O".
在那种情况下,我的输出数据框应该如下所示
DataPartition TimeStamp OrganizationID SourceID AuditorID AuditorEnumerationId AuditorOpinionCode AuditorOpinionId IsPlayingAuditorRole IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode AuditorOpinionOnInternalControlsId AuditorOpinionOnGoingConcernId rank
Japan 2018-05-03T09:52:48+00:00 4295876589 194 2719 3023331 AOP 3010542 true false true O|!| null null null null 1
Japan 2018-05-03T09:52:48+00:00 4295876589 195 16157 1002485247 UWE 3010547 true false false O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 196 3252 3024053 ONC 3020538 true false true O|!| null null null null 1
Japan 2018-05-03T07:36:47+00:00 4295876589 195 5937 3026578 NOP 3010543 true false true O|!| null null null null 1
Japan 2018-05-02T10:37:56+00:00 4295876589 193 null null null null null null null I|!| null null null null 1
Japan 2018-05-03T08:10:19+00:00 4295876589 196 null null null null null null null D|!| null null null null 1
这是我的代码
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1 && $"AuditorID" =!= "null")
场景 2 ...
这是我的数据框
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear TimeStamp
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 310 182 INC 500186 null null null null O|!| Japan 2018 2018-05-10T08:30:53+00:00
当我在答案中应用建议的代码时,我的输出低于
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1 && (($"UpdateReason_updateReasonId" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|")).drop("rank")
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
但我的预期输出是这样的。
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
您可以使用 rownum udf 删除重复项并检查 rownum =1 并且 authorid 不为空
这是适合您的工作代码
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1 && (($"AuditorID" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|"))
哪个应该给你
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|DataPartition|TimeStamp |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|rank|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|Japan |2018-05-03T09:52:48+00:00|4295876589 |194 |2719 |3023331 |AOP |3010542 |true |false |true |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T09:52:48+00:00|4295876589 |195 |16157 |1002485247 |UWE |3010547 |true |false |false |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T07:36:47+00:00|4295876589 |196 |3252 |3024053 |ONC |3020538 |true |false |true |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T07:36:47+00:00|4295876589 |195 |5937 |3026578 |NOP |3010543 |true |false |true |O|!| |null |null |null |null |1 |
|Japan |2018-05-03T08:10:19+00:00|4295876589 |196 |null |null |null |null |null |null |null |D|!| |null |null |null |null |1 |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
注意:sourceID 为 193 的记录有 o|!|和 null 所以它不应该在输出中