制作新数据框(PySpark)的有效方法是什么?
What is efficient way to make a new dataframe (PySpark)?
我有一个像这样的数据框:
+---------------+-------+
| date | ID | count |
+--------+------+-------+
|20170101| 258 | 1003 |
|20170102| 258 | 13 |
|20170103| 258 | 1 |
|20170104| 258 | 108 |
|20170109| 258 | 25 |
| ... | ... | ... |
|20170101| 2813 | 503 |
|20170102| 2813 | 139 |
| ... | ... | ... |
|20170101| 4963 | 821 |
|20170102| 4963 | 450 |
| ... | ... | ... |
+--------+------+-------+
在我的数据框中,没有一些数据。
例如,此处缺少 ID 258
的日期 20170105
~ 20170108
缺少数据意味着没有出现(=计数== 0)。
但我也想添加计数为 0 的数据,如下所示:
+---------------+-------+
| date | ID | count |
+--------+------+-------+
|20170101| 258 | 1003 |
|20170102| 258 | 13 |
|20170103| 258 | 1 |
|20170104| 258 | 108 |
|20170105| 258 | 0 |
|20170106| 258 | 0 |
|20170107| 258 | 0 |
|20170108| 258 | 0 |
|20170109| 258 | 25 |
| ... | ... | ... |
|20170101| 2813 | 503 |
|20170102| 2813 | 139 |
| ... | ... | ... |
|20170101| 4963 | 821 |
|20170102| 4963 | 450 |
| ... | ... | ... |
+--------+------+-------+
dataframe 是不可变的,所以如果我想向这个 dataframe 添加零计数数据,
必须制作一个新的数据框。
但即使我有持续时间(20170101 ~ 20171231)和 ID 列表,我也不能使用 for loop
到数据帧。
如何制作新的数据框?
ps。我已经尝试过的是制作一个正确的数据框,然后比较 2 个数据框,制作另一个只有 0 个计数数据的数据框。最后合并“原始数据帧”和“0 计数数据帧”。我认为这不是一个好的和漫长的过程。请向我推荐一些其他有效的解决方案。
from pyspark.sql.functions import unix_timestamp, from_unixtime, struct, datediff, lead, col, explode, lit, udf
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DateType
from datetime import timedelta
#sample data
df = sc.parallelize([
['20170101', 258, 1003],
['20170102', 258, 13],
['20170103', 258, 1],
['20170104', 258, 108],
['20170109', 258, 25],
['20170101', 2813, 503],
['20170102', 2813, 139],
['20170101', 4963, 821],
['20170102', 4963, 450]]).\
toDF(('date', 'ID', 'count')).\
withColumn("date", from_unixtime(unix_timestamp('date', 'yyyyMMdd')).cast('date'))
df.show()
def date_list_fn(d):
return [d[0] + timedelta(days=x) for x in range(1, d[1])]
date_list_udf = udf(date_list_fn, ArrayType(DateType()))
w = Window.partitionBy('ID').orderBy('date')
#dataframe having missing date
df_missing = df.withColumn("diff", datediff(lead('date').over(w), 'date')).\
filter(col("diff") > 1).\
withColumn("date_list", date_list_udf(struct("date", "diff"))).\
withColumn("date_list", explode(col("date_list"))).\
select(col("date_list").alias("date"), "ID", lit(0).alias("count"))
#final dataframe by combining sample data with missing date dataframe
final_df = df.union(df_missing).sort(col("ID"), col("date"))
final_df.show()
示例数据:
+----------+----+-----+
| date| ID|count|
+----------+----+-----+
|2017-01-01| 258| 1003|
|2017-01-02| 258| 13|
|2017-01-03| 258| 1|
|2017-01-04| 258| 108|
|2017-01-09| 258| 25|
|2017-01-01|2813| 503|
|2017-01-02|2813| 139|
|2017-01-01|4963| 821|
|2017-01-02|4963| 450|
+----------+----+-----+
输出为:
+----------+----+-----+
| date| ID|count|
+----------+----+-----+
|2017-01-01| 258| 1003|
|2017-01-02| 258| 13|
|2017-01-03| 258| 1|
|2017-01-04| 258| 108|
|2017-01-05| 258| 0|
|2017-01-06| 258| 0|
|2017-01-07| 258| 0|
|2017-01-08| 258| 0|
|2017-01-09| 258| 25|
|2017-01-01|2813| 503|
|2017-01-02|2813| 139|
|2017-01-01|4963| 821|
|2017-01-02|4963| 450|
+----------+----+-----+
我有一个像这样的数据框:
+---------------+-------+
| date | ID | count |
+--------+------+-------+
|20170101| 258 | 1003 |
|20170102| 258 | 13 |
|20170103| 258 | 1 |
|20170104| 258 | 108 |
|20170109| 258 | 25 |
| ... | ... | ... |
|20170101| 2813 | 503 |
|20170102| 2813 | 139 |
| ... | ... | ... |
|20170101| 4963 | 821 |
|20170102| 4963 | 450 |
| ... | ... | ... |
+--------+------+-------+
在我的数据框中,没有一些数据。
例如,此处缺少 ID 258
的日期 20170105
~ 20170108
缺少数据意味着没有出现(=计数== 0)。
但我也想添加计数为 0 的数据,如下所示:
+---------------+-------+
| date | ID | count |
+--------+------+-------+
|20170101| 258 | 1003 |
|20170102| 258 | 13 |
|20170103| 258 | 1 |
|20170104| 258 | 108 |
|20170105| 258 | 0 |
|20170106| 258 | 0 |
|20170107| 258 | 0 |
|20170108| 258 | 0 |
|20170109| 258 | 25 |
| ... | ... | ... |
|20170101| 2813 | 503 |
|20170102| 2813 | 139 |
| ... | ... | ... |
|20170101| 4963 | 821 |
|20170102| 4963 | 450 |
| ... | ... | ... |
+--------+------+-------+
dataframe 是不可变的,所以如果我想向这个 dataframe 添加零计数数据, 必须制作一个新的数据框。
但即使我有持续时间(20170101 ~ 20171231)和 ID 列表,我也不能使用 for loop
到数据帧。
如何制作新的数据框?
ps。我已经尝试过的是制作一个正确的数据框,然后比较 2 个数据框,制作另一个只有 0 个计数数据的数据框。最后合并“原始数据帧”和“0 计数数据帧”。我认为这不是一个好的和漫长的过程。请向我推荐一些其他有效的解决方案。
from pyspark.sql.functions import unix_timestamp, from_unixtime, struct, datediff, lead, col, explode, lit, udf
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DateType
from datetime import timedelta
#sample data
df = sc.parallelize([
['20170101', 258, 1003],
['20170102', 258, 13],
['20170103', 258, 1],
['20170104', 258, 108],
['20170109', 258, 25],
['20170101', 2813, 503],
['20170102', 2813, 139],
['20170101', 4963, 821],
['20170102', 4963, 450]]).\
toDF(('date', 'ID', 'count')).\
withColumn("date", from_unixtime(unix_timestamp('date', 'yyyyMMdd')).cast('date'))
df.show()
def date_list_fn(d):
return [d[0] + timedelta(days=x) for x in range(1, d[1])]
date_list_udf = udf(date_list_fn, ArrayType(DateType()))
w = Window.partitionBy('ID').orderBy('date')
#dataframe having missing date
df_missing = df.withColumn("diff", datediff(lead('date').over(w), 'date')).\
filter(col("diff") > 1).\
withColumn("date_list", date_list_udf(struct("date", "diff"))).\
withColumn("date_list", explode(col("date_list"))).\
select(col("date_list").alias("date"), "ID", lit(0).alias("count"))
#final dataframe by combining sample data with missing date dataframe
final_df = df.union(df_missing).sort(col("ID"), col("date"))
final_df.show()
示例数据:
+----------+----+-----+
| date| ID|count|
+----------+----+-----+
|2017-01-01| 258| 1003|
|2017-01-02| 258| 13|
|2017-01-03| 258| 1|
|2017-01-04| 258| 108|
|2017-01-09| 258| 25|
|2017-01-01|2813| 503|
|2017-01-02|2813| 139|
|2017-01-01|4963| 821|
|2017-01-02|4963| 450|
+----------+----+-----+
输出为:
+----------+----+-----+
| date| ID|count|
+----------+----+-----+
|2017-01-01| 258| 1003|
|2017-01-02| 258| 13|
|2017-01-03| 258| 1|
|2017-01-04| 258| 108|
|2017-01-05| 258| 0|
|2017-01-06| 258| 0|
|2017-01-07| 258| 0|
|2017-01-08| 258| 0|
|2017-01-09| 258| 25|
|2017-01-01|2813| 503|
|2017-01-02|2813| 139|
|2017-01-01|4963| 821|
|2017-01-02|4963| 450|
+----------+----+-----+