在 Spark 中将日期转换为 ISO 周日期
Convert date to ISO week date in Spark
一列中有日期,如何创建包含 ISO week date 的列?
ISO周日期由年、周数和周数组成。
- year 与使用
year
函数获取的年份不同。
- 周数 是简单的部分 - 它可以使用
weekofyear
. 获得
- weekday 应该 return 周一为 1,周日为 7,而 Spark 的
dayofweek
做不到。
示例数据框:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
('1977-12-31',),
('1978-01-01',),
('1978-01-02',),
('1978-12-31',),
('1979-01-01',),
('1979-12-30',),
('1979-12-31',),
('1980-01-01',)],
['my_date']
).select(F.col('my_date').cast('date'))
df.show()
#+----------+
#| my_date|
#+----------+
#|1977-12-31|
#|1978-01-01|
#|1978-01-02|
#|1978-12-31|
#|1979-01-01|
#|1979-12-30|
#|1979-12-31|
#|1980-01-01|
#+----------+
想要的结果:
+----------+-------------+
| my_date|iso_week_date|
+----------+-------------+
|1977-12-31| 1977-W52-6|
|1978-01-01| 1977-W52-7|
|1978-01-02| 1978-W01-1|
|1978-12-31| 1978-W52-7|
|1979-01-01| 1979-W01-1|
|1979-12-30| 1979-W52-7|
|1979-12-31| 1980-W01-1|
|1980-01-01| 1980-W01-2|
+----------+-------------+
首先,可以为 year 和 weekday 的列创建规则。然后,使用 concat_ws
and lpad
.
连接它们
week_from_prev_year = (F.month('my_date') == 1) & (F.weekofyear('my_date') > 9)
week_from_next_year = (F.month('my_date') == 12) & (F.weekofyear('my_date') == 1)
iso_year = F.when(week_from_prev_year, F.year('my_date') - 1) \
.when(week_from_next_year, F.year('my_date') + 1) \
.otherwise(F.year('my_date'))
iso_weekday = F.when(F.dayofweek('my_date') != 1, F.dayofweek('my_date')-1).otherwise(7)
iso_week_date = F.concat_ws('-', iso_year, F.lpad(F.weekofyear('my_date'), 3, 'W0'), iso_weekday)
df2 = df.withColumn('iso_week_date', iso_week_date)
df2.show()
#+----------+-------------+
#| my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31| 1977-W52-6|
#|1978-01-01| 1977-W52-7|
#|1978-01-02| 1978-W01-1|
#|1978-12-31| 1978-W52-7|
#|1979-01-01| 1979-W01-1|
#|1979-12-30| 1979-W52-7|
#|1979-12-31| 1980-W01-1|
#|1980-01-01| 1980-W01-2|
#+----------+-------------+
你的解决方案已经很好了,也许你可以通过简化计算来缩短它:
iso_weekday
= (dayofweek(my_date) + 5)%7 + 1
iso_year
=year(date_add(my_date, 4 - iso_weekday))
这给你:
import pyspark.sql.functions as F
df.withColumn(
'iso_week_date',
F.concat_ws(
"-",
F.year(F.expr("date_add(my_date, 4 - (dayofweek(my_date) + 5) % 7 + 1)")),
F.lpad(F.weekofyear('my_date'), 3, "W0"),
(F.dayofweek('my_date') + 5) % 7 + 1
)
).show()
#+----------+-------------+
#| my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31| 1977-W52-6|
#|1978-01-01| 1977-W52-7|
#|1978-01-02| 1978-W01-1|
#|1978-12-31| 1978-W52-7|
#|1979-01-01| 1979-W01-1|
#|1979-12-30| 1979-W52-7|
#|1979-12-31| 1980-W01-1|
#|1980-01-01| 1980-W01-2|
#+----------+-------------+
Spark SQL extract
让这变得更容易。
iso_year
= F.expr("EXTRACT(YEAROFWEEK FROM my_date)")
iso_weekday
= F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")
因此,使用 concat_ws
构建其他答案:
import pyspark.sql.functions as F
df.withColumn(
'iso_week_date',
F.concat_ws(
"-",
F.expr("EXTRACT(YEAROFWEEK FROM my_date)"),
F.lpad(F.weekofyear('my_date'), 3, "W0"),
F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")
)
).show()
#+----------+-------------+
#| my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31| 1977-W52-6|
#|1978-01-01| 1977-W52-7|
#|1978-01-02| 1978-W01-1|
#|1978-12-31| 1978-W52-7|
#|1979-01-01| 1979-W01-1|
#|1979-12-30| 1979-W52-7|
#|1979-12-31| 1980-W01-1|
#|1980-01-01| 1980-W01-2|
#+----------+-------------+
一列中有日期,如何创建包含 ISO week date 的列?
ISO周日期由年、周数和周数组成。
- year 与使用
year
函数获取的年份不同。 - 周数 是简单的部分 - 它可以使用
weekofyear
. 获得
- weekday 应该 return 周一为 1,周日为 7,而 Spark 的
dayofweek
做不到。
示例数据框:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
('1977-12-31',),
('1978-01-01',),
('1978-01-02',),
('1978-12-31',),
('1979-01-01',),
('1979-12-30',),
('1979-12-31',),
('1980-01-01',)],
['my_date']
).select(F.col('my_date').cast('date'))
df.show()
#+----------+
#| my_date|
#+----------+
#|1977-12-31|
#|1978-01-01|
#|1978-01-02|
#|1978-12-31|
#|1979-01-01|
#|1979-12-30|
#|1979-12-31|
#|1980-01-01|
#+----------+
想要的结果:
+----------+-------------+
| my_date|iso_week_date|
+----------+-------------+
|1977-12-31| 1977-W52-6|
|1978-01-01| 1977-W52-7|
|1978-01-02| 1978-W01-1|
|1978-12-31| 1978-W52-7|
|1979-01-01| 1979-W01-1|
|1979-12-30| 1979-W52-7|
|1979-12-31| 1980-W01-1|
|1980-01-01| 1980-W01-2|
+----------+-------------+
首先,可以为 year 和 weekday 的列创建规则。然后,使用 concat_ws
and lpad
.
week_from_prev_year = (F.month('my_date') == 1) & (F.weekofyear('my_date') > 9)
week_from_next_year = (F.month('my_date') == 12) & (F.weekofyear('my_date') == 1)
iso_year = F.when(week_from_prev_year, F.year('my_date') - 1) \
.when(week_from_next_year, F.year('my_date') + 1) \
.otherwise(F.year('my_date'))
iso_weekday = F.when(F.dayofweek('my_date') != 1, F.dayofweek('my_date')-1).otherwise(7)
iso_week_date = F.concat_ws('-', iso_year, F.lpad(F.weekofyear('my_date'), 3, 'W0'), iso_weekday)
df2 = df.withColumn('iso_week_date', iso_week_date)
df2.show()
#+----------+-------------+
#| my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31| 1977-W52-6|
#|1978-01-01| 1977-W52-7|
#|1978-01-02| 1978-W01-1|
#|1978-12-31| 1978-W52-7|
#|1979-01-01| 1979-W01-1|
#|1979-12-30| 1979-W52-7|
#|1979-12-31| 1980-W01-1|
#|1980-01-01| 1980-W01-2|
#+----------+-------------+
你的解决方案已经很好了,也许你可以通过简化计算来缩短它:
iso_weekday
=(dayofweek(my_date) + 5)%7 + 1
iso_year
=year(date_add(my_date, 4 - iso_weekday))
这给你:
import pyspark.sql.functions as F
df.withColumn(
'iso_week_date',
F.concat_ws(
"-",
F.year(F.expr("date_add(my_date, 4 - (dayofweek(my_date) + 5) % 7 + 1)")),
F.lpad(F.weekofyear('my_date'), 3, "W0"),
(F.dayofweek('my_date') + 5) % 7 + 1
)
).show()
#+----------+-------------+
#| my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31| 1977-W52-6|
#|1978-01-01| 1977-W52-7|
#|1978-01-02| 1978-W01-1|
#|1978-12-31| 1978-W52-7|
#|1979-01-01| 1979-W01-1|
#|1979-12-30| 1979-W52-7|
#|1979-12-31| 1980-W01-1|
#|1980-01-01| 1980-W01-2|
#+----------+-------------+
Spark SQL extract
让这变得更容易。
iso_year
=F.expr("EXTRACT(YEAROFWEEK FROM my_date)")
iso_weekday
=F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")
因此,使用 concat_ws
构建其他答案:
import pyspark.sql.functions as F
df.withColumn(
'iso_week_date',
F.concat_ws(
"-",
F.expr("EXTRACT(YEAROFWEEK FROM my_date)"),
F.lpad(F.weekofyear('my_date'), 3, "W0"),
F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")
)
).show()
#+----------+-------------+
#| my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31| 1977-W52-6|
#|1978-01-01| 1977-W52-7|
#|1978-01-02| 1978-W01-1|
#|1978-12-31| 1978-W52-7|
#|1979-01-01| 1979-W01-1|
#|1979-12-30| 1979-W52-7|
#|1979-12-31| 1980-W01-1|
#|1980-01-01| 1980-W01-2|
#+----------+-------------+