将最大值时间戳放入 PySpark 的数组中
Put largest value timestamp in array in PySpark
我有一个包含以下列的 PySpark 数据框(比如 df1
)
1.> category
- 包含独特的类别类型
2.> start_time_array
- 按升序排列的时间戳数组
3.> end_time_array
- 按升序排列的时间戳数组
4.> lenStart
- start_time_array
中的数组长度
5.> lenEnd
- end_time_array
中的数组长度
以下是 df1
的示例:
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B| [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 1|
+--------+------------------------------------------+------------------------------------------+--------+------+
还有另一个数据框 df2
,它包含两列 category
和 timestamp
。 df2
包含与 df1
相同的 category
值,df1
中数组内的时间戳值是 df2
中时间戳的子集。以下是 df2
的示例
+--------+-------------------+
|category| timestamp|
+--------+-------------------+
| A|2017-01-16 00:00:00|
| A|2017-01-18 00:00:00|
| A|2017-01-25 00:00:00|
| A|2017-01-27 00:00:00|
| B|2017-02-14 00:00:00|
| B|2017-02-18 00:00:00|
| B|2017-02-21 00:00:00|
| B|2017-02-22 00:00:00|
| B|2017-02-24 00:00:00|
| B|2017-02-25 00:00:00|
+--------+-------------------+
正如我们在上面 df1
的例子中看到的,对于 category -> B
, lenStart=2
不等于 lenEnd=1
。在 df1
的所有行中,lenStart = lenEnd
或 lenStart = lenEnd+1
对于 df1
中 lenStart = lenEnd+1
的所有行,我想取timestamp
(适当的category
)的最大值,并将其放在end_time_array
中的数组中。我该怎么做?
以下是使用 df2
中的信息处理 df1
后的预期输出
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 2|
+--------+------------------------------------------+------------------------------------------+--------+------+
这应该适用于 Spark 1.5+:
import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))
df1_changed
将有一个修改过的 end_time_array
列,当您请求的条件适用时,它会添加所需的值,否则,它保持不变。
我有一个包含以下列的 PySpark 数据框(比如 df1
)
1.> category
- 包含独特的类别类型
2.> start_time_array
- 按升序排列的时间戳数组
3.> end_time_array
- 按升序排列的时间戳数组
4.> lenStart
- start_time_array
5.> lenEnd
- end_time_array
以下是 df1
的示例:
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B| [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 1|
+--------+------------------------------------------+------------------------------------------+--------+------+
还有另一个数据框 df2
,它包含两列 category
和 timestamp
。 df2
包含与 df1
相同的 category
值,df1
中数组内的时间戳值是 df2
中时间戳的子集。以下是 df2
+--------+-------------------+
|category| timestamp|
+--------+-------------------+
| A|2017-01-16 00:00:00|
| A|2017-01-18 00:00:00|
| A|2017-01-25 00:00:00|
| A|2017-01-27 00:00:00|
| B|2017-02-14 00:00:00|
| B|2017-02-18 00:00:00|
| B|2017-02-21 00:00:00|
| B|2017-02-22 00:00:00|
| B|2017-02-24 00:00:00|
| B|2017-02-25 00:00:00|
+--------+-------------------+
正如我们在上面 df1
的例子中看到的,对于 category -> B
, lenStart=2
不等于 lenEnd=1
。在 df1
的所有行中,lenStart = lenEnd
或 lenStart = lenEnd+1
对于 df1
中 lenStart = lenEnd+1
的所有行,我想取timestamp
(适当的category
)的最大值,并将其放在end_time_array
中的数组中。我该怎么做?
以下是使用 df2
df1
后的预期输出
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 2|
+--------+------------------------------------------+------------------------------------------+--------+------+
这应该适用于 Spark 1.5+:
import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))
df1_changed
将有一个修改过的 end_time_array
列,当您请求的条件适用时,它会添加所需的值,否则,它保持不变。