如何根据 Window 中固定的列值增加计数器?
How to increment counters based on a column value being fixed in a Window?
我有一个数据集,随着时间的推移,它指示某些用户所在的区域。从这个数据集中,我想计算他们在每个位置度过的夜晚数。 "spending the night" 我的意思是:获取用户最后一次看到的位置,直到某一天的 23 点 59 分;如果该用户在第二天 05:00 之前观察到的所有位置,或者如果还有 none 之后的第一个位置,则与前一天的最后一个位置相匹配,那就是在该位置度过的一晚。
| Timestamp| User| Location|
|1462838468|49B4361512443A4DA...|1|
|1462838512|49B4361512443A4DA...|1|
|1462838389|49B4361512443A4DA...|2|
|1462838497|49B4361512443A4DA...|3|
|1465975885|6E9E0581E2A032FD8...|1|
|1457723815|405C238E25FE0B9E7...|1|
|1457897289|405C238E25FE0B9E7...|2|
|1457899229|405C238E25FE0B9E7...|11|
|1457972626|405C238E25FE0B9E7...|9|
|1458062553|405C238E25FE0B9E7...|9|
|1458241825|405C238E25FE0B9E7...|9|
|1458244457|405C238E25FE0B9E7...|9|
|1458412513|405C238E25FE0B9E7...|6|
|1458412292|405C238E25FE0B9E7...|6|
|1465197963|6E9E0581E2A032FD8...|6|
|1465202192|6E9E0581E2A032FD8...|6|
|1465923817|6E9E0581E2A032FD8...|5|
|1465923766|6E9E0581E2A032FD8...|2|
|1465923748|6E9E0581E2A032FD8...|2|
|1465923922|6E9E0581E2A032FD8...|2|
我想我需要在这里使用 Window 函数,并且我过去曾将 PySpark 用于其他用途,但我有点不知道从哪里开始。
我认为最后你确实需要一个函数来处理一系列事件并输出花费的夜晚......类似于(只是为了理解这个想法的例子):
def nights_spent(location_events):
# location_events is a list of events that have time and location
location_events = sort_by_time(location_events)
nights = []
prev_event = None
for event in location_events[1:]:
if prev_location is not None:
if next_day(prev_event.time, event.time) \
and same_location(prev_event.location, event.location):
# TODO: How do you handle when prev_event
# and event are more than 1 day apart?
nights.append(prev_location)
prev_location = location
return nights
然后,我认为第一个好的方法是首先按用户分组,这样您就可以获得给定用户的所有事件(包括位置和时间)。
然后您可以将该事件列表提供给上面的函数,您将在一个 RDD 中拥有所有 (user, nights_spent)
行。
所以,一般来说,RDD 看起来像这样:
nights_spent_per_user = all_events.map(lambda x => (x.user, [(x.time, x.location)])).reduce(lambda a, b: a + b).map(x => (x[0], nights_spent(x[1])))
希望对您有所帮助。
我有一个数据集,随着时间的推移,它指示某些用户所在的区域。从这个数据集中,我想计算他们在每个位置度过的夜晚数。 "spending the night" 我的意思是:获取用户最后一次看到的位置,直到某一天的 23 点 59 分;如果该用户在第二天 05:00 之前观察到的所有位置,或者如果还有 none 之后的第一个位置,则与前一天的最后一个位置相匹配,那就是在该位置度过的一晚。
| Timestamp| User| Location|
|1462838468|49B4361512443A4DA...|1|
|1462838512|49B4361512443A4DA...|1|
|1462838389|49B4361512443A4DA...|2|
|1462838497|49B4361512443A4DA...|3|
|1465975885|6E9E0581E2A032FD8...|1|
|1457723815|405C238E25FE0B9E7...|1|
|1457897289|405C238E25FE0B9E7...|2|
|1457899229|405C238E25FE0B9E7...|11|
|1457972626|405C238E25FE0B9E7...|9|
|1458062553|405C238E25FE0B9E7...|9|
|1458241825|405C238E25FE0B9E7...|9|
|1458244457|405C238E25FE0B9E7...|9|
|1458412513|405C238E25FE0B9E7...|6|
|1458412292|405C238E25FE0B9E7...|6|
|1465197963|6E9E0581E2A032FD8...|6|
|1465202192|6E9E0581E2A032FD8...|6|
|1465923817|6E9E0581E2A032FD8...|5|
|1465923766|6E9E0581E2A032FD8...|2|
|1465923748|6E9E0581E2A032FD8...|2|
|1465923922|6E9E0581E2A032FD8...|2|
我想我需要在这里使用 Window 函数,并且我过去曾将 PySpark 用于其他用途,但我有点不知道从哪里开始。
我认为最后你确实需要一个函数来处理一系列事件并输出花费的夜晚......类似于(只是为了理解这个想法的例子):
def nights_spent(location_events):
# location_events is a list of events that have time and location
location_events = sort_by_time(location_events)
nights = []
prev_event = None
for event in location_events[1:]:
if prev_location is not None:
if next_day(prev_event.time, event.time) \
and same_location(prev_event.location, event.location):
# TODO: How do you handle when prev_event
# and event are more than 1 day apart?
nights.append(prev_location)
prev_location = location
return nights
然后,我认为第一个好的方法是首先按用户分组,这样您就可以获得给定用户的所有事件(包括位置和时间)。
然后您可以将该事件列表提供给上面的函数,您将在一个 RDD 中拥有所有 (user, nights_spent)
行。
所以,一般来说,RDD 看起来像这样:
nights_spent_per_user = all_events.map(lambda x => (x.user, [(x.time, x.location)])).reduce(lambda a, b: a + b).map(x => (x[0], nights_spent(x[1])))
希望对您有所帮助。