Spark 数据帧及时变换 window
Spark dataframe transform in time window
我有两个数据框。 [AllAccounts]:包含对所有用户的所有帐户的审核
UserId, AccountId, Balance, CreatedOn
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00
[ActiveAccounts]:仅包含对任何用户的活动帐户(可以是 0 或 1)的审计
UserId, AccountId, CreatedOn
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00
我想将它们转换成格式为
的单个 DF
UserId, AccountId, Balance, CreatedOn, IsActive
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00, true
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00, true
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00, true
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00, false
因此,基于 ActiveAccounts 中的帐户,我需要适当地标记第一个 df 中的行。如示例中所示,userId 1 的 acc2 在 2016-12-06T17:09:38.123-05:00 被标记为活动,acc3 在 2016-12-07T17:09:39.123-05:00 被标记为活动。所以顺便说一句,这些时间范围 acc2 将被标记为 true,而 2016-12-07T17:09:39 之后的 acc3 将被标记为 true。
执行此操作的有效方法是什么。
如果我理解正确,帐户 (1, acc1)
在其创建时间和 (1, acc2)
之间处于活动状态。
我们可以通过几个步骤完成此操作:
- 为每个帐户创建一个包含 start/end 次的数据框
- 加入
AllAccounts
- 标记结果数据帧的行
我没有测试过,所以可能有语法错误。
要完成第一个任务,我们需要按user
对数据帧进行分区,然后查看下一个创建时间。这需要一个 window 函数:
val window = Window.partitionBy("UserId").orderBy("StartTime")
val activeTimes = ActiveAccounts.withColumnRenamed("CreatedOn", "StartTime")
.withColumn("EndTime", lead("StartTime") over window)
请注意,每个用户的最后 EndTime
将是 null
。现在加入:
val withActive = AllAcounts.join(activeTimes, Seq("UserId", "AccountId"))
(如果您可能缺少某些帐户的活动时间,这应该是左联接。)
然后您必须完成并将帐户标记为活动帐户:
val withFlags = withActive.withColumn("isActive",
$"CreatedOn" >= $"StartTime" &&
($"EndTime".isNull || ($"CreatedOn" < $"EndTime)))
我有两个数据框。 [AllAccounts]:包含对所有用户的所有帐户的审核
UserId, AccountId, Balance, CreatedOn
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00
[ActiveAccounts]:仅包含对任何用户的活动帐户(可以是 0 或 1)的审计
UserId, AccountId, CreatedOn
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00
我想将它们转换成格式为
的单个 DFUserId, AccountId, Balance, CreatedOn, IsActive
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00, true
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00, true
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00, true
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00, false
因此,基于 ActiveAccounts 中的帐户,我需要适当地标记第一个 df 中的行。如示例中所示,userId 1 的 acc2 在 2016-12-06T17:09:38.123-05:00 被标记为活动,acc3 在 2016-12-07T17:09:39.123-05:00 被标记为活动。所以顺便说一句,这些时间范围 acc2 将被标记为 true,而 2016-12-07T17:09:39 之后的 acc3 将被标记为 true。
执行此操作的有效方法是什么。
如果我理解正确,帐户 (1, acc1)
在其创建时间和 (1, acc2)
之间处于活动状态。
我们可以通过几个步骤完成此操作:
- 为每个帐户创建一个包含 start/end 次的数据框
- 加入
AllAccounts
- 标记结果数据帧的行
我没有测试过,所以可能有语法错误。
要完成第一个任务,我们需要按user
对数据帧进行分区,然后查看下一个创建时间。这需要一个 window 函数:
val window = Window.partitionBy("UserId").orderBy("StartTime")
val activeTimes = ActiveAccounts.withColumnRenamed("CreatedOn", "StartTime")
.withColumn("EndTime", lead("StartTime") over window)
请注意,每个用户的最后 EndTime
将是 null
。现在加入:
val withActive = AllAcounts.join(activeTimes, Seq("UserId", "AccountId"))
(如果您可能缺少某些帐户的活动时间,这应该是左联接。)
然后您必须完成并将帐户标记为活动帐户:
val withFlags = withActive.withColumn("isActive",
$"CreatedOn" >= $"StartTime" &&
($"EndTime".isNull || ($"CreatedOn" < $"EndTime)))