如何使用pyspark从一个月中的某天获取工作日
How to get the weekday from day of month using pyspark
我有一个数据框log_df:
我根据以下代码生成了一个新的数据框:
from pyspark.sql.functions import split, regexp_extract
split_log_df = log_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_log_df.show(10, truncate=False)
新数据框如下:
我需要另一列显示星期几,创建它的最佳方式是什么?理想情况下,只需在 select.
中添加一个类似 udf 的字段
非常感谢。
已更新:我的问题与评论中的问题不同,我需要的是根据log_df中的字符串进行计算,而不是像评论那样基于时间戳,所以这不是一个重复的问题。谢谢
我终于自己解决了这个问题,这里是完整的解决方案:
- 导入date_format、日期时间、数据类型
- 首先,修改正则表达式以提取 01/Jul/1995
- 使用 func
将 01/Jul/1995 转换为 DateType
- 创建一个 udf dayOfWeek 以简短格式获取星期几(周一、周二、...)
- 使用 udf 将 DateType 01/Jul/1995 转换为星期六的工作日
我对我的解决方案不太满意,因为它似乎是曲折的,如果有人能想出更优雅的解决方案,我将不胜感激,提前谢谢。
我建议使用一些不同的方法
from pyspark.sql.functions import date_format
df.select('capturetime', date_format('capturetime', 'u').alias('dow_number'), date_format('capturetime', 'E').alias('dow_string'))
df3.show()
它给...
+--------------------+----------+----------+
| capturetime|dow_number|dow_string|
+--------------------+----------+----------+
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
我这样做是为了从以下日期获取工作日:
def get_weekday(date):
import datetime
import calendar
month, day, year = (int(x) for x in date.split('/'))
weekday = datetime.date(year, month, day)
return calendar.day_name[weekday.weekday()]
spark.udf.register('get_weekday', get_weekday)
用法示例:
df.createOrReplaceTempView("weekdays")
df = spark.sql("select DateTime, PlayersCount, get_weekday(Date) as Weekday from weekdays")
从 Spark 2.3 开始,您可以使用 dayofweek 函数
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.dayofweek.html
from pyspark.sql.functions import dayofweek
df.withColumn('day_of_week', dayofweek('my_timestamp'))
然而,这将一周的开始定义为星期日 = 1
如果您不想这样,而是要求星期一 = 1,那么您可以做一个不雅的软糖,例如在使用 dayofweek 函数之前减去 1 天,或者像这样修改结果
from pyspark.sql.functions import dayofweek
df.withColumn('day_of_week', ((dayofweek('my_timestamp')+5)%7)+1)
## Here is a potential solution with using UDF which can solve the issue.
# UDF’s are a black box to PySpark as it can’t apply any optimization and you
# will lose all the optimization PySpark does on Dataframe. so you should use
# Spark SQL built-in functions as these functions provide optimization.
# you should use UDF only when existing built-in SQL function doesn’t have it.
from dateutil.parser import parse
def findWeekday(dt):
dt = parse(dt)
return dt.strftime('%A')
weekDayUDF = udf(lambda x:findWeekday(x),StringType())
df.withColumn('weekday',weekDayUDF('ORDERDATE')).show()
+-------+---------------+--------+---------+
| SALES| ORDERDATE|MONTH_ID| weekday|
+-------+---------------+--------+---------+
| 2871.0| 2/24/2003 0:00| 2| Monday|
| 2765.9| 5/7/2003 0:00| 5|Wednesday|
|3884.34| 7/1/2003 0:00| 7| Tuesday|
| 3746.7| 8/25/2003 0:00| 8| Monday|
|5205.27|10/10/2003 0:00| 10| Friday|
|3479.76|10/28/2003 0:00| 10| Tuesday|
|2497.77|11/11/2003 0:00| 11| Tuesday|
|5512.32|11/18/2003 0:00| 11| Tuesday|
|2168.54| 12/1/2003 0:00| 12| Monday|
|4708.44| 1/15/2004 0:00| 1| Thursday|
|3965.66| 2/20/2004 0:00| 2| Friday|
我有一个数据框log_df:
我根据以下代码生成了一个新的数据框:
from pyspark.sql.functions import split, regexp_extract
split_log_df = log_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_log_df.show(10, truncate=False)
新数据框如下:
我需要另一列显示星期几,创建它的最佳方式是什么?理想情况下,只需在 select.
中添加一个类似 udf 的字段非常感谢。
已更新:我的问题与评论中的问题不同,我需要的是根据log_df中的字符串进行计算,而不是像评论那样基于时间戳,所以这不是一个重复的问题。谢谢
我终于自己解决了这个问题,这里是完整的解决方案:
- 导入date_format、日期时间、数据类型
- 首先,修改正则表达式以提取 01/Jul/1995
- 使用 func 将 01/Jul/1995 转换为 DateType
- 创建一个 udf dayOfWeek 以简短格式获取星期几(周一、周二、...)
- 使用 udf 将 DateType 01/Jul/1995 转换为星期六的工作日
我对我的解决方案不太满意,因为它似乎是曲折的,如果有人能想出更优雅的解决方案,我将不胜感激,提前谢谢。
我建议使用一些不同的方法
from pyspark.sql.functions import date_format
df.select('capturetime', date_format('capturetime', 'u').alias('dow_number'), date_format('capturetime', 'E').alias('dow_string'))
df3.show()
它给...
+--------------------+----------+----------+
| capturetime|dow_number|dow_string|
+--------------------+----------+----------+
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
|2017-06-05 10:05:...| 1| Mon|
我这样做是为了从以下日期获取工作日:
def get_weekday(date):
import datetime
import calendar
month, day, year = (int(x) for x in date.split('/'))
weekday = datetime.date(year, month, day)
return calendar.day_name[weekday.weekday()]
spark.udf.register('get_weekday', get_weekday)
用法示例:
df.createOrReplaceTempView("weekdays")
df = spark.sql("select DateTime, PlayersCount, get_weekday(Date) as Weekday from weekdays")
从 Spark 2.3 开始,您可以使用 dayofweek 函数 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.dayofweek.html
from pyspark.sql.functions import dayofweek
df.withColumn('day_of_week', dayofweek('my_timestamp'))
然而,这将一周的开始定义为星期日 = 1
如果您不想这样,而是要求星期一 = 1,那么您可以做一个不雅的软糖,例如在使用 dayofweek 函数之前减去 1 天,或者像这样修改结果
from pyspark.sql.functions import dayofweek
df.withColumn('day_of_week', ((dayofweek('my_timestamp')+5)%7)+1)
## Here is a potential solution with using UDF which can solve the issue.
# UDF’s are a black box to PySpark as it can’t apply any optimization and you
# will lose all the optimization PySpark does on Dataframe. so you should use
# Spark SQL built-in functions as these functions provide optimization.
# you should use UDF only when existing built-in SQL function doesn’t have it.
from dateutil.parser import parse
def findWeekday(dt):
dt = parse(dt)
return dt.strftime('%A')
weekDayUDF = udf(lambda x:findWeekday(x),StringType())
df.withColumn('weekday',weekDayUDF('ORDERDATE')).show()
+-------+---------------+--------+---------+
| SALES| ORDERDATE|MONTH_ID| weekday|
+-------+---------------+--------+---------+
| 2871.0| 2/24/2003 0:00| 2| Monday|
| 2765.9| 5/7/2003 0:00| 5|Wednesday|
|3884.34| 7/1/2003 0:00| 7| Tuesday|
| 3746.7| 8/25/2003 0:00| 8| Monday|
|5205.27|10/10/2003 0:00| 10| Friday|
|3479.76|10/28/2003 0:00| 10| Tuesday|
|2497.77|11/11/2003 0:00| 11| Tuesday|
|5512.32|11/18/2003 0:00| 11| Tuesday|
|2168.54| 12/1/2003 0:00| 12| Monday|
|4708.44| 1/15/2004 0:00| 1| Thursday|
|3965.66| 2/20/2004 0:00| 2| Friday|