PySpark 中多列的日期算法
Date Arithmetic with Multiple Columns in PySpark
我正在尝试使用 PySpark 数据框中的多列进行一些中等复杂的日期算法。基本上,我有一个名为 number
的列,它表示我需要过滤的 created_at
时间戳之后的周数。在 PostgreSQL 中,你可以乘以一个 interval based on the value in a column,但我似乎无法弄清楚如何在 PySpark 中使用 SQL API 或 [=21] =] API。如有任何帮助,我们将不胜感激!
import datetime
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
start_date = datetime.date(2020,1,1)
my_df = sc.parallelize([
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=1, metric=10),
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=2, metric=10),
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=3, metric=10),
Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=1, metric=20),
Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=2, metric=20),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=7, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=8, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=9, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=10, metric=30),
]).toDF()
# This doesn't work!
new_df = my_df.where("created_at + interval 7 days * number > '" + start_date.strftime("%Y-%m-%d") +"'")
# Neither does this!
new_df = my_df.filter(my_df.created_at + datetime.timedelta(days=my_df.number * 7)).date() > start_date.date()
可能 需要将日期转换为字符串,使用 python 中的 datetime
库将字符串转换为 datetime
对象,然后执行操作,但这似乎很疯狂。
好的,我找到了使用 expr
和内置 date_add
函数的方法。
from pyspark.sql.functions import expr, date_add
new_df = my_df.withColumn('test', expr('date_add(created_at, number*7)'))
filtered = new_df.filter(new_df.test > start_date)
filtered.show()
希望深入了解 how/why 这在一般情况下是有效的,但是,如果其他人想要添加!
我正在尝试使用 PySpark 数据框中的多列进行一些中等复杂的日期算法。基本上,我有一个名为 number
的列,它表示我需要过滤的 created_at
时间戳之后的周数。在 PostgreSQL 中,你可以乘以一个 interval based on the value in a column,但我似乎无法弄清楚如何在 PySpark 中使用 SQL API 或 [=21] =] API。如有任何帮助,我们将不胜感激!
import datetime
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
start_date = datetime.date(2020,1,1)
my_df = sc.parallelize([
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=1, metric=10),
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=2, metric=10),
Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=3, metric=10),
Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=1, metric=20),
Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=2, metric=20),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=7, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=8, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=9, metric=30),
Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=10, metric=30),
]).toDF()
# This doesn't work!
new_df = my_df.where("created_at + interval 7 days * number > '" + start_date.strftime("%Y-%m-%d") +"'")
# Neither does this!
new_df = my_df.filter(my_df.created_at + datetime.timedelta(days=my_df.number * 7)).date() > start_date.date()
可能 datetime
库将字符串转换为 datetime
对象,然后执行操作,但这似乎很疯狂。
好的,我找到了使用 expr
和内置 date_add
函数的方法。
from pyspark.sql.functions import expr, date_add
new_df = my_df.withColumn('test', expr('date_add(created_at, number*7)'))
filtered = new_df.filter(new_df.test > start_date)
filtered.show()
希望深入了解 how/why 这在一般情况下是有效的,但是,如果其他人想要添加!