在 Spark 中将日期转换为 ISO 周日期

Convert date to ISO week date in Spark

一列中有日期,如何创建包含 ISO week date 的列?

ISO周日期由周数周数组成。

示例数据框:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
    ('1977-12-31',),
    ('1978-01-01',),
    ('1978-01-02',),
    ('1978-12-31',),
    ('1979-01-01',),
    ('1979-12-30',),
    ('1979-12-31',),
    ('1980-01-01',)],
    ['my_date']
).select(F.col('my_date').cast('date'))

df.show()
#+----------+
#|   my_date|
#+----------+
#|1977-12-31|
#|1978-01-01|
#|1978-01-02|
#|1978-12-31|
#|1979-01-01|
#|1979-12-30|
#|1979-12-31|
#|1980-01-01|
#+----------+

想要的结果:

+----------+-------------+
|   my_date|iso_week_date|
+----------+-------------+
|1977-12-31|   1977-W52-6|
|1978-01-01|   1977-W52-7|
|1978-01-02|   1978-W01-1|
|1978-12-31|   1978-W52-7|
|1979-01-01|   1979-W01-1|
|1979-12-30|   1979-W52-7|
|1979-12-31|   1980-W01-1|
|1980-01-01|   1980-W01-2|
+----------+-------------+

首先,可以为 yearweekday 的列创建规则。然后,使用 concat_ws and lpad.

连接它们
week_from_prev_year = (F.month('my_date') == 1) & (F.weekofyear('my_date') > 9)
week_from_next_year = (F.month('my_date') == 12) & (F.weekofyear('my_date') == 1)
iso_year = F.when(week_from_prev_year, F.year('my_date') - 1) \
            .when(week_from_next_year, F.year('my_date') + 1) \
            .otherwise(F.year('my_date'))
iso_weekday = F.when(F.dayofweek('my_date') != 1, F.dayofweek('my_date')-1).otherwise(7)
iso_week_date = F.concat_ws('-', iso_year, F.lpad(F.weekofyear('my_date'), 3, 'W0'), iso_weekday)
df2 = df.withColumn('iso_week_date', iso_week_date)

df2.show()
#+----------+-------------+
#|   my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31|   1977-W52-6|
#|1978-01-01|   1977-W52-7|
#|1978-01-02|   1978-W01-1|
#|1978-12-31|   1978-W52-7|
#|1979-01-01|   1979-W01-1|
#|1979-12-30|   1979-W52-7|
#|1979-12-31|   1980-W01-1|
#|1980-01-01|   1980-W01-2|
#+----------+-------------+

你的解决方案已经很好了,也许你可以通过简化计算来缩短它:

  • iso_weekday = (dayofweek(my_date) + 5)%7 + 1
  • iso_year=year(date_add(my_date, 4 - iso_weekday))

这给你:

import pyspark.sql.functions as F

df.withColumn(
    'iso_week_date',
    F.concat_ws(
        "-",
        F.year(F.expr("date_add(my_date, 4 - (dayofweek(my_date) + 5) % 7 + 1)")),
        F.lpad(F.weekofyear('my_date'), 3, "W0"),
        (F.dayofweek('my_date') + 5) % 7 + 1
    )
).show()

#+----------+-------------+
#|   my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31|   1977-W52-6|
#|1978-01-01|   1977-W52-7|
#|1978-01-02|   1978-W01-1|
#|1978-12-31|   1978-W52-7|
#|1979-01-01|   1979-W01-1|
#|1979-12-30|   1979-W52-7|
#|1979-12-31|   1980-W01-1|
#|1980-01-01|   1980-W01-2|
#+----------+-------------+

Spark SQL extract 让这变得更容易。

  • iso_year = F.expr("EXTRACT(YEAROFWEEK FROM my_date)")
  • iso_weekday = F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")

因此,使用 concat_ws 构建其他答案:

import pyspark.sql.functions as F

df.withColumn(
    'iso_week_date',
    F.concat_ws(
        "-",
        F.expr("EXTRACT(YEAROFWEEK FROM my_date)"),
        F.lpad(F.weekofyear('my_date'), 3, "W0"),
        F.expr("EXTRACT(DAYOFWEEK_ISO FROM my_date)")
    )
).show()

#+----------+-------------+
#|   my_date|iso_week_date|
#+----------+-------------+
#|1977-12-31|   1977-W52-6|
#|1978-01-01|   1977-W52-7|
#|1978-01-02|   1978-W01-1|
#|1978-12-31|   1978-W52-7|
#|1979-01-01|   1979-W01-1|
#|1979-12-30|   1979-W52-7|
#|1979-12-31|   1980-W01-1|
#|1980-01-01|   1980-W01-2|
#+----------+-------------+