如何使用pyspark从一个月中的某天获取工作日

How to get the weekday from day of month using pyspark

我有一个数据框log_df:

我根据以下代码生成了一个新的数据框:

from pyspark.sql.functions import split, regexp_extract 
split_log_df = log_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                          regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                          regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                          regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                          regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_log_df.show(10, truncate=False)

新数据框如下:

我需要另一列显示星期几,创建它的最佳方式是什么?理想情况下,只需在 select.

中添加一个类似 udf 的字段

非常感谢。

已更新:我的问题与评论中的问题不同,我需要的是根据log_df中的字符串进行计算,而不是像评论那样基于时间戳,所以这不是一个重复的问题。谢谢

我终于自己解决了这个问题,这里是完整的解决方案:

  1. 导入date_format、日期时间、数据类型
  2. 首先,修改正则表达式以提取 01/Jul/1995
  3. 使用 func
  4. 将 01/Jul/1995 转换为 DateType
  5. 创建一个 udf dayOfWeek 以简短格式获取星期几(周一、周二、...)
  6. 使用 udf 将 DateType 01/Jul/1995 转换为星期六的工作日

我对我的解决方案不太满意,因为它似乎是曲折的,如果有人能想出更优雅的解决方案,我将不胜感激,提前谢谢。

我建议使用一些不同的方法

from pyspark.sql.functions import date_format
df.select('capturetime', date_format('capturetime', 'u').alias('dow_number'), date_format('capturetime', 'E').alias('dow_string'))
df3.show()

它给...

+--------------------+----------+----------+
|         capturetime|dow_number|dow_string|
+--------------------+----------+----------+
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|
|2017-06-05 10:05:...|         1|       Mon|

我这样做是为了从以下日期获取工作日:

def get_weekday(date):
    import datetime
    import calendar
    month, day, year = (int(x) for x in date.split('/'))    
    weekday = datetime.date(year, month, day)
    return calendar.day_name[weekday.weekday()]

spark.udf.register('get_weekday', get_weekday)

用法示例:

df.createOrReplaceTempView("weekdays")
df = spark.sql("select DateTime, PlayersCount, get_weekday(Date) as Weekday from weekdays")

从 Spark 2.3 开始,您可以使用 dayofweek 函数 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.dayofweek.html

from pyspark.sql.functions import dayofweek
df.withColumn('day_of_week', dayofweek('my_timestamp'))

然而,这将一周的开始定义为星期日 = 1

如果您不想这样,而是要求星期一 = 1,那么您可以做一个不雅的软糖,例如在使用 dayofweek 函数之前减去 1 天,或者像这样修改结果

from pyspark.sql.functions import dayofweek
df.withColumn('day_of_week', ((dayofweek('my_timestamp')+5)%7)+1)
## Here is a potential solution with using UDF which can solve the issue.  

# UDF’s are a black box to PySpark as it can’t apply any optimization and you 
# will lose all the optimization PySpark does on Dataframe. so you should use 
# Spark SQL built-in functions as these functions provide optimization. 
# you should use UDF only when existing built-in SQL function doesn’t have it.


from dateutil.parser import parse

def findWeekday(dt):
    dt = parse(dt)
    return dt.strftime('%A')

weekDayUDF = udf(lambda x:findWeekday(x),StringType())

df.withColumn('weekday',weekDayUDF('ORDERDATE')).show()


+-------+---------------+--------+---------+
|  SALES|      ORDERDATE|MONTH_ID|  weekday|
+-------+---------------+--------+---------+
| 2871.0| 2/24/2003 0:00|       2|   Monday|
| 2765.9|  5/7/2003 0:00|       5|Wednesday|
|3884.34|  7/1/2003 0:00|       7|  Tuesday|
| 3746.7| 8/25/2003 0:00|       8|   Monday|
|5205.27|10/10/2003 0:00|      10|   Friday|
|3479.76|10/28/2003 0:00|      10|  Tuesday|
|2497.77|11/11/2003 0:00|      11|  Tuesday|
|5512.32|11/18/2003 0:00|      11|  Tuesday|
|2168.54| 12/1/2003 0:00|      12|   Monday|
|4708.44| 1/15/2004 0:00|       1| Thursday|
|3965.66| 2/20/2004 0:00|       2|   Friday|