Spark 数据帧及时变换 window

Spark dataframe transform in time window

我有两个数据框。 [AllAccounts]:包含对所有用户的所有帐户的审核

UserId, AccountId, Balance, CreatedOn
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00  
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00

[ActiveAccounts]:仅包含对任何用户的活动帐户(可以是 0 或 1)的审计

UserId, AccountId, CreatedOn
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00

我想将它们转换成格式为

的单个 DF
UserId, AccountId, Balance, CreatedOn, IsActive
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00, true 
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00, true
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00, true
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00, false

因此,基于 ActiveAccounts 中的帐户,我需要适当地标记第一个 df 中的行。如示例中所示,userId 1 的 acc2 在 2016-12-06T17:09:38.123-05:00 被标记为活动,acc3 在 2016-12-07T17:09:39.123-05:00 被标记为活动。所以顺便说一句,这些时间范围 acc2 将被标记为 true,而 2016-12-07T17:09:39 之后的 acc3 将被标记为 true。

执行此操作的有效方法是什么。

如果我理解正确,帐户 (1, acc1) 在其创建时间和 (1, acc2) 之间处于活动状态。

我们可以通过几个步骤完成此操作:

  • 为每个帐户创建一个包含 start/end 次的数据框
  • 加入AllAccounts
  • 标记结果数据帧的行

我没有测试过,所以可能有语法错误。

要完成第一个任务,我们需要按user对数据帧进行分区,然后查看下一个创建时间。这需要一个 window 函数:

val window = Window.partitionBy("UserId").orderBy("StartTime")
val activeTimes = ActiveAccounts.withColumnRenamed("CreatedOn", "StartTime")
  .withColumn("EndTime", lead("StartTime") over window)

请注意,每个用户的最后 EndTime 将是 null。现在加入:

val withActive = AllAcounts.join(activeTimes, Seq("UserId", "AccountId"))

(如果您可能缺少某些帐户的活动时间,这应该是左联接。)

然后您必须完成并将帐户标记为活动帐户:

val withFlags = withActive.withColumn("isActive",
  $"CreatedOn" >= $"StartTime" && 
 ($"EndTime".isNull || ($"CreatedOn" < $"EndTime)))