PySpark - 在没有显式会话密钥的情况下组合会话数据/遍历所有行

PySpark - Combining Session Data without Explicit Session Key / Iterating over All Rows

我正在尝试在 PySpark 中聚合没有真正会话 "key" 的会话数据。我有在特定时间在某个区域检测到个人的数据,我想将其汇总为特定访问期间在每个区域 花费的持续时间(见下文)。

这里棘手的部分是我想将某人离开每个区域的时间推断为他们在下一个区域被检测到的时间。这意味着我需要使用下一个区域 ID 的开始时间作为任何给定区域 ID 的结束时间。同一个人的区域 ID 也可以出现多次。

我在 MapReduce 中实现了这一点,我在其中遍历所有行并汇总时间,直到检测到新的 AreaID 或 Individual,然后输出记录。有没有办法在 Spark 中做类似的事情?有没有更好的方法来解决这个问题?

另外请注意,除非在另一个区域(例如下面的 IndividualY、AreaT)检测到该个体,否则我不想输出记录

我有以下格式的数据集:

Individual  AreaID  Datetime of Detection
IndividualX AreaQ   1/7/2015 0:00
IndividualX AreaQ   1/7/2015 1:00
IndividualX AreaW   1/7/2015 3:00
IndividualX AreaQ   1/7/2015 4:00
IndividualY AreaZ   2/7/2015 4:00
IndividualY AreaZ   2/7/2015 5:00
IndividualY AreaW   2/7/2015 6:00
IndividualY AreaT   2/7/2015 7:00

我想要期望的输出:

Individual  AreaID  Start_Time      End_Time        Duration (minutes)
IndividualX AreaQ   1/7/2015 0:00   1/7/2015 3:00   180
IndividualX AreaW   1/7/2015 3:00   1/7/2015 4:00   60
IndividualY AreaZ   2/7/2015 4:00   2/7/2015 6:00   120
IndividualY AreaW   2/7/2015 6:00   2/7/2015 7:00   60

这是一个特别漂亮的解决方案,但您可以使用 DataFrames 和 window 函数。假设您的输入如下所示:

rdd = sc.parallelize([
    ("IndividualX", "AreaQ",  "1/7/2015 0:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 1:00"),
    ("IndividualX", "AreaW",  "1/7/2015 3:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 5:00"),
    ("IndividualY", "AreaW",  "2/7/2015 6:00"),
    ("IndividualY", "AreaT",  "2/7/2015 7:00")
])

首先我们必须将其转换为 DataFrame:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

row = Row("individual", "area_id", "datetime")
fmt = "%d/%m/%Y %H:%M"
df = rdd.map(lambda r: row(r[0], r[1], datetime.strptime(r[2], fmt))).toDF()

接下来让我们定义一个window:

from pyspark.sql import functions as f
from pyspark.sql.window import Window

w = Window().partitionBy("individual").orderBy("datetime")

和临时列:

p_area_id = f.lag("area_id").over(w) # Previous area

ind =  f.sum((
    p_area_id.isNull() | # No previous observation
    (p_area_id != f.col("area_id")) # Area changed
).cast("integer")).over(w)

使用上面定义的指标,我们可以选择访问该区域的最小时间戳:

tmp = (df
   .withColumn("ind", ind)
   .groupBy("individual", "area_id", "ind")
   .agg(f.min("datetime").alias("datetime"))
   .drop("ind"))

最后我们可以定义目标列:

end_time = f.lead(f.col("datetime")).over(w)

duration = (
    f.col("end_time").cast("integer") - f.col("datetime").cast("integer")) / 60

并构建输出 DataFrame:

result = (tmp
    .withColumn("end_time", end_time)
    .where(f.col("end_time").isNotNull())
    .withColumn("duration", duration)
    .withColumnRenamed("datetime", "start_time"))

并输出:

+-----------+-------+--------------------+--------------------+--------+
| individual|area_id|          start_time|            end_time|duration|
+-----------+-------+--------------------+--------------------+--------+
|IndividualX|  AreaQ|2015-07-01 00:00:...|2015-07-01 03:00:...|   180.0|
|IndividualX|  AreaW|2015-07-01 03:00:...|2015-07-01 04:00:...|    60.0|
|IndividualY|  AreaZ|2015-07-02 04:00:...|2015-07-02 06:00:...|   120.0|
|IndividualY|  AreaW|2015-07-02 06:00:...|2015-07-02 07:00:...|    60.0|
+-----------+-------+--------------------+--------------------+--------+

如果你更喜欢普通的 RDD,你可以重塑成这样的东西:

(individual, (area_id, datetime))

和下一个 groupByKey 并在本地执行所需的操作。

Zero323 的解决方案效果很好,但也想 post 一个 rdd 实现。我认为这对尝试将流式 MapReduce 转换为 pyspark 的人很有帮助。我的实现基本上将键(在本例中为个人)映射到与该键(区域和时间)关联的流值列表列表,然后迭代列表以满足迭代组件 - 其余只是正常减少通过按键和映射。


    from pyspark import SparkContext, SparkFiles, SparkConf
    from datetime import datetime

    conf = SparkConf()
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(["IndividualX|AreaQ|1/7/2015 0:00",
                          "IndividualX|AreaQ|1/7/2015 1:00",
                          "IndividualX|AreaW|1/7/2015 3:00",
                          "IndividualX|AreaQ|1/7/2015 4:00",
                          "IndividualY|AreaZ|2/7/2015 4:00",
                          "IndividualY|AreaZ|2/7/2015 5:00",
                          "IndividualY|AreaW|2/7/2015 6:00",
                          "IndividualY|AreaT|2/7/2015 7:00"])

    def splitReduce(x):
        y = x.split('|')
        return (str(y[0]),[[str(y[2]),str(y[1])]])

    def resultSet(x):

        processlist = sorted(x[1], key=lambda x: x[0])

        result = []
        start_area = processlist[0][1]
        start_date = datetime.strptime(processlist[0][0], '%d/%m/%Y %H:%M')
        dur = 0

        if len(processlist) > 1:

            for datearea in processlist[1::]:

                end_date = datetime.strptime(datearea[0],'%d/%m/%Y %H:%M')
                end_area = datearea[1]

                dur = (end_date-start_date).total_seconds()/60

                if start_area != end_area:
                    result.append([start_area,start_date,end_date,dur])
                    start_date = datetime.strptime(datearea[0], '%d/%m/%Y %H:%M')
                    start_area = datearea[1]
                    dur = 0

        return (x[0],result)

    def finalOut(x):
        return str(x[0]) + '|' + str(x[1][0]) + '|' + str(x[1][1]) + '|' + str(x[1][2]) + '|' + str(x[1][3])

    footfall = rdd\
    .map(lambda x: splitReduce(x))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: resultSet(x))\
    .flatMapValues(lambda x: x)\
    .map(lambda x: finalOut(x))\
    .collect()

    print footfall

提供以下输出:

['IndividualX|AreaQ|2015-07-01 00:00:00|2015-07-01 03:00:00|180.0',
'IndividualX|AreaW|2015-07-01 03:00:00|2015-07-01 04:00:00|60.0',
'IndividualY|AreaZ|2015-07-02 04:00:00|2015-07-02 06:00:00|120.0',
'IndividualY|AreaW|2015-07-02 06:00:00|2015-07-02 07:00:00|60.0']