从 pandas 到 pyspark - 将具有开始和结束日期的数据框转换为每日数据?
Going from pandas to pyspark - Convert dataframe with start and end date to daily data?
我有一个使用 Pandas 的 ETL 脚本,为了使其更具可扩展性,我正在尝试使用 Pyspark 重新创建它。到目前为止一切顺利,但对日常数据集的特定转换存在问题。每个 ID 都有一条记录,其中包含开始日期和结束日期
id age state start_date end_date
123 18 CA 2/17/2019 5/4/2019
223 24 AZ 1/17/2019 3/4/2019
我想为开始日和结束日之间的每一天创建一个记录,这样我就可以将每日 activity 数据加入其中。目标输出看起来像这样
id age state start_date
123 18 CA 2/17/2019
123 18 CA 2/18/2019
123 18 CA 2/19/2019
123 18 CA 2/20/2019
123 18 CA 2/21/2019
…
123 18 CA 5/2/2019
123 18 CA 5/3/2019
123 18 CA 5/4/2019
当然,对数据集中的所有 ID 及其各自的开始日期执行此操作。我能够在 Pandas 中使用以下方法做到这一点
melt = df.melt(id_vars=['id', 'age', 'state'], value_name='date').drop('variable', axis=1)
melt['date'] = pd.to_datetime(melt['date'])
melt = melt.groupby('id').apply(lambda x: x.set_index('date').resample('d').first())\
.ffill()\
.reset_index(level=1)\
.reset_index(drop=True)
但是我对 Pyspark 还很陌生(并且在 Pandas 中一直在努力解决这个问题)所以我被困在这里。非常感谢任何帮助 - 谢谢!
在 this post 中找到了解决方案。我的解决方案的关键是 explode 函数,它可以满足我的需要。
解决我具体例子的代码是
def date_range(t1, t2, step=60*60*24):
return [t1 + step*x for x in range(int((t2-t1)/step)+1)]
date_range_udf = udf(date_range, ArrayType(LongType()))
df = dataF.select("id",
expr("stack(2, 'start_date', start_date, 'end_date', end_date) as (class_date,date)"))
df_base = \
df.groupBy('id')\
.agg(min('date').cast('integer').alias('date_min'), max('date').cast('integer')\
.alias('date_max'))\
.withColumn("date", explode(date_range_udf("date_min", "date_max")))\
.drop('date_min', 'date_max')\
.withColumn("date", from_unixtime("date"))
它给出了以下输出(我可以用它来加入任何额外的数据)
我有一个使用 Pandas 的 ETL 脚本,为了使其更具可扩展性,我正在尝试使用 Pyspark 重新创建它。到目前为止一切顺利,但对日常数据集的特定转换存在问题。每个 ID 都有一条记录,其中包含开始日期和结束日期
id age state start_date end_date
123 18 CA 2/17/2019 5/4/2019
223 24 AZ 1/17/2019 3/4/2019
我想为开始日和结束日之间的每一天创建一个记录,这样我就可以将每日 activity 数据加入其中。目标输出看起来像这样
id age state start_date
123 18 CA 2/17/2019
123 18 CA 2/18/2019
123 18 CA 2/19/2019
123 18 CA 2/20/2019
123 18 CA 2/21/2019
…
123 18 CA 5/2/2019
123 18 CA 5/3/2019
123 18 CA 5/4/2019
当然,对数据集中的所有 ID 及其各自的开始日期执行此操作。我能够在 Pandas 中使用以下方法做到这一点
melt = df.melt(id_vars=['id', 'age', 'state'], value_name='date').drop('variable', axis=1)
melt['date'] = pd.to_datetime(melt['date'])
melt = melt.groupby('id').apply(lambda x: x.set_index('date').resample('d').first())\
.ffill()\
.reset_index(level=1)\
.reset_index(drop=True)
但是我对 Pyspark 还很陌生(并且在 Pandas 中一直在努力解决这个问题)所以我被困在这里。非常感谢任何帮助 - 谢谢!
在 this post 中找到了解决方案。我的解决方案的关键是 explode 函数,它可以满足我的需要。
解决我具体例子的代码是
def date_range(t1, t2, step=60*60*24):
return [t1 + step*x for x in range(int((t2-t1)/step)+1)]
date_range_udf = udf(date_range, ArrayType(LongType()))
df = dataF.select("id",
expr("stack(2, 'start_date', start_date, 'end_date', end_date) as (class_date,date)"))
df_base = \
df.groupBy('id')\
.agg(min('date').cast('integer').alias('date_min'), max('date').cast('integer')\
.alias('date_max'))\
.withColumn("date", explode(date_range_udf("date_min", "date_max")))\
.drop('date_min', 'date_max')\
.withColumn("date", from_unixtime("date"))
它给出了以下输出(我可以用它来加入任何额外的数据)