将以秒为单位的列转换为人类可读的持续时间
Transform column with seconds to human readable duration
我在 pyspark 中有以下数据框:
Name | Seconds
|Enviar solicitud ...| 1415
|Analizar mapa de ...| 1209|
|Modificar solicit...| 591|
|Entregar servicio...|91049|
我希望将 seconds
列转换为日期或时间戳(希望是日期),我正在尝试使用以下函数
def to_date(seconds=0):
dat = ''
if seconds == 0:
dat = '0'
if (seconds / 86400) >= 1:
day = (int(seconds / 86400))
seconds = (seconds - 86400 * int(seconds / 86400))
dat = f'{day}d '
if (seconds / 3600) >= 1:
hour = (int(seconds / 3600))
seconds = (seconds - 3600 * int(seconds / 3600))
dat = dat + f'{hour}hr '
if (seconds / 60) >= 1:
minutes = (int(seconds / 60))
dat = dat + f'{minutes}min'
else:
return '0min'
return dat
但是在 pyspark 中没有 Pandas .apply(to_date)
之类的简单方法,无论如何可以实现我想要做的事情吗?
预期输出:
Analizar mapa de comparacion de presupuestos 1209 20min
Crear mapa de comparacion de presupuestos 12155 3hr 22min
Entregar servicios de bienes 91049 1d 1hr 17min
这应该为您提供 DD:HH:MM:SS
格式的输出。
df = spark.createDataFrame([
(1, 1209),
(2, 12155),
(3, 91049)
], ("ID","timeSec"))
def convert(seconds):
days = seconds // (24 * 3600)
seconds = seconds % (24 * 3600)
hour = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%02d:%02d:%02d:%02d" % (days, hour, minutes, seconds)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
apply_my_udf = udf(lambda z: convert(z), StringType())
df2 = df.withColumn("timeStr", apply_my_udf(df.timeSec))
df2.show()
+---+-------+-----------+
| ID|timeSec| timeStr|
+---+-------+-----------+
| 1| 1209|00:00:20:09|
| 2| 12155|00:03:22:35|
| 3| 91049|01:01:17:29|
+---+-------+-----------+
我认为这可以在没有 UDF 的情况下实现,并且对于大数据来说它会更快并且可扩展。试试这个,让我知道我的逻辑是否有漏洞。
from pyspark.sql import functions as F
from pyspark.sql.functions import when
df.withColumn("Minutes", F.round((F.col("Seconds")/60),2))\
.withColumn("Hours", F.floor((F.col("Minutes")/60)))\
.withColumn("hourmin", F.floor(F.col("Minutes")-(F.col("Hours").cast("int") * 60)))\
.withColumn("Days", F.floor((F.col("Hours")/24)))\
.withColumn("Days2", F.col("Days")*24)\
.withColumn("Time", F.when((F.col("Hours")==0) &(F.col("Days")==0), F.concat(F.col("hourmin"),F.lit("min"))).when((F.col("Hours")!=0)&(F.col("Days")==0), F.concat(F.col("Hours"),F.lit("hr "),F.col("hourmin"),F.lit("min"))).when(F.col("Days")!=0, F.concat(F.col("Days"),F.lit("d "),(F.col("Hours")-F.col("Days2")),F.lit("hr "),F.col("hourmin"),F.lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2")\
.show()
+-----------------+-------+---------------+
| Name|Seconds| Time|
+-----------------+-------+---------------+
| Enviar solicitud| 1209| 20min|
| Analizar mapa de| 12155| 3hr 22min|
|Entregar servicio| 91049| 1d 1hr 17min|
| example1| 1900| 31min|
| example2| 2500| 41min|
| example3|9282398|107d 10hr 26min|
+-----------------+-------+---------------+
Spark 中没有内置函数,但可以在没有 UDF 的情况下完成。您可以简单地使用除法和取模运算来计算它以获得不同的部分(天,小时,......),并连接以获得所需的格式。
对于 Spark 2.4+,您可以使用高阶函数 zip_with
and array_join
。首先创建 parts
列,其中包含来自 Seconds
列的天数、小时数、分钟数和秒数。然后将其与单位文字数组 array('d', 'hr', 'min', 'sec')
压缩,以将每个部分与其单位连接起来,最后使用逗号分隔符连接所有元素。
duration_parts = [(86400, 7), (3600, 24), (60, 60), (1, 60)]
exp = "zip_with(parts, array('d', 'hr', 'min', 'sec'), (x, y) -> IF(x > 0, concat(x, y), null))"
df.withColumn("parts", array(*[(floor(col("Seconds") / d)) % m for d, m in duration_parts]))\
.withColumn("duration", array_join(expr(exp), ", "))\
.drop("parts")\
.show(truncate=False)
#+--------------------------------------------+-------+---------------------+
#|Name |Seconds|duration |
#+--------------------------------------------+-------+---------------------+
#|Analizar mapa de comparacion de presupuestos|1209 |20min, 9sec |
#|Crear mapa de comparacion de presupuestos |12155 |3hr, 22min, 35sec |
#|Entregar servicios de bienes |91049 |1d, 1hr, 17min, 29sec|
#+--------------------------------------------+-------+---------------------+
另一种方法是使用 concat
并添加 when
表达式,如果您不想要等于 0 的部分:
df.withColumn("duration", concat(
floor(col("Seconds") / 86400), lit("d, "),
floor(col("Seconds") % 86400 / 3600), lit("hr, "),
floor((col("Seconds") % 86400) % 3600 / 60), lit("min, "),
floor(((col("Seconds") % 86400) % 3600) % 60), lit("sec "),
)).show(truncate=False)
我在 pyspark 中有以下数据框:
Name | Seconds
|Enviar solicitud ...| 1415
|Analizar mapa de ...| 1209|
|Modificar solicit...| 591|
|Entregar servicio...|91049|
我希望将 seconds
列转换为日期或时间戳(希望是日期),我正在尝试使用以下函数
def to_date(seconds=0):
dat = ''
if seconds == 0:
dat = '0'
if (seconds / 86400) >= 1:
day = (int(seconds / 86400))
seconds = (seconds - 86400 * int(seconds / 86400))
dat = f'{day}d '
if (seconds / 3600) >= 1:
hour = (int(seconds / 3600))
seconds = (seconds - 3600 * int(seconds / 3600))
dat = dat + f'{hour}hr '
if (seconds / 60) >= 1:
minutes = (int(seconds / 60))
dat = dat + f'{minutes}min'
else:
return '0min'
return dat
但是在 pyspark 中没有 Pandas .apply(to_date)
之类的简单方法,无论如何可以实现我想要做的事情吗?
预期输出:
Analizar mapa de comparacion de presupuestos 1209 20min
Crear mapa de comparacion de presupuestos 12155 3hr 22min
Entregar servicios de bienes 91049 1d 1hr 17min
这应该为您提供 DD:HH:MM:SS
格式的输出。
df = spark.createDataFrame([
(1, 1209),
(2, 12155),
(3, 91049)
], ("ID","timeSec"))
def convert(seconds):
days = seconds // (24 * 3600)
seconds = seconds % (24 * 3600)
hour = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%02d:%02d:%02d:%02d" % (days, hour, minutes, seconds)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
apply_my_udf = udf(lambda z: convert(z), StringType())
df2 = df.withColumn("timeStr", apply_my_udf(df.timeSec))
df2.show()
+---+-------+-----------+
| ID|timeSec| timeStr|
+---+-------+-----------+
| 1| 1209|00:00:20:09|
| 2| 12155|00:03:22:35|
| 3| 91049|01:01:17:29|
+---+-------+-----------+
我认为这可以在没有 UDF 的情况下实现,并且对于大数据来说它会更快并且可扩展。试试这个,让我知道我的逻辑是否有漏洞。
from pyspark.sql import functions as F
from pyspark.sql.functions import when
df.withColumn("Minutes", F.round((F.col("Seconds")/60),2))\
.withColumn("Hours", F.floor((F.col("Minutes")/60)))\
.withColumn("hourmin", F.floor(F.col("Minutes")-(F.col("Hours").cast("int") * 60)))\
.withColumn("Days", F.floor((F.col("Hours")/24)))\
.withColumn("Days2", F.col("Days")*24)\
.withColumn("Time", F.when((F.col("Hours")==0) &(F.col("Days")==0), F.concat(F.col("hourmin"),F.lit("min"))).when((F.col("Hours")!=0)&(F.col("Days")==0), F.concat(F.col("Hours"),F.lit("hr "),F.col("hourmin"),F.lit("min"))).when(F.col("Days")!=0, F.concat(F.col("Days"),F.lit("d "),(F.col("Hours")-F.col("Days2")),F.lit("hr "),F.col("hourmin"),F.lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2")\
.show()
+-----------------+-------+---------------+
| Name|Seconds| Time|
+-----------------+-------+---------------+
| Enviar solicitud| 1209| 20min|
| Analizar mapa de| 12155| 3hr 22min|
|Entregar servicio| 91049| 1d 1hr 17min|
| example1| 1900| 31min|
| example2| 2500| 41min|
| example3|9282398|107d 10hr 26min|
+-----------------+-------+---------------+
Spark 中没有内置函数,但可以在没有 UDF 的情况下完成。您可以简单地使用除法和取模运算来计算它以获得不同的部分(天,小时,......),并连接以获得所需的格式。
对于 Spark 2.4+,您可以使用高阶函数 zip_with
and array_join
。首先创建 parts
列,其中包含来自 Seconds
列的天数、小时数、分钟数和秒数。然后将其与单位文字数组 array('d', 'hr', 'min', 'sec')
压缩,以将每个部分与其单位连接起来,最后使用逗号分隔符连接所有元素。
duration_parts = [(86400, 7), (3600, 24), (60, 60), (1, 60)]
exp = "zip_with(parts, array('d', 'hr', 'min', 'sec'), (x, y) -> IF(x > 0, concat(x, y), null))"
df.withColumn("parts", array(*[(floor(col("Seconds") / d)) % m for d, m in duration_parts]))\
.withColumn("duration", array_join(expr(exp), ", "))\
.drop("parts")\
.show(truncate=False)
#+--------------------------------------------+-------+---------------------+
#|Name |Seconds|duration |
#+--------------------------------------------+-------+---------------------+
#|Analizar mapa de comparacion de presupuestos|1209 |20min, 9sec |
#|Crear mapa de comparacion de presupuestos |12155 |3hr, 22min, 35sec |
#|Entregar servicios de bienes |91049 |1d, 1hr, 17min, 29sec|
#+--------------------------------------------+-------+---------------------+
另一种方法是使用 concat
并添加 when
表达式,如果您不想要等于 0 的部分:
df.withColumn("duration", concat(
floor(col("Seconds") / 86400), lit("d, "),
floor(col("Seconds") % 86400 / 3600), lit("hr, "),
floor((col("Seconds") % 86400) % 3600 / 60), lit("min, "),
floor(((col("Seconds") % 86400) % 3600) % 60), lit("sec "),
)).show(truncate=False)