将以秒为单位的列转换为人类可读的持续时间

Transform column with seconds to human readable duration

我在 pyspark 中有以下数据框:

Name                 | Seconds

|Enviar solicitud ...| 1415

|Analizar mapa de ...| 1209|

|Modificar solicit...|  591|

|Entregar servicio...|91049|

我希望将 seconds 列转换为日期或时间戳(希望是日期),我正在尝试使用以下函数

def to_date(seconds=0):
    dat = ''
    if seconds == 0:
        dat = '0'
    if (seconds / 86400) >= 1:
        day = (int(seconds / 86400))
        seconds = (seconds - 86400 * int(seconds / 86400))
        dat = f'{day}d '
    if (seconds / 3600) >= 1:
        hour = (int(seconds / 3600))
        seconds = (seconds - 3600 * int(seconds / 3600))
        dat = dat + f'{hour}hr '
    if (seconds / 60) >= 1:
        minutes = (int(seconds / 60))
        dat = dat + f'{minutes}min'   
    else:
        return '0min'
    return dat

但是在 pyspark 中没有 Pandas .apply(to_date) 之类的简单方法,无论如何可以实现我想要做的事情吗?

预期输出

Analizar mapa de comparacion de presupuestos         1209         20min
Crear mapa de comparacion de presupuestos           12155     3hr 22min
Entregar servicios de bienes                        91049  1d 1hr 17min

这应该为您提供 DD:HH:MM:SS 格式的输出。

df = spark.createDataFrame([
    (1, 1209), 
    (2, 12155),
    (3, 91049)
], ("ID","timeSec"))


def convert(seconds):
    days = seconds // (24 * 3600) 
    seconds = seconds % (24 * 3600) 
    hour = seconds // 3600
    seconds %= 3600
    minutes = seconds // 60
    seconds %= 60

    return "%02d:%02d:%02d:%02d" % (days, hour, minutes, seconds) 

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

apply_my_udf = udf(lambda z: convert(z), StringType())

df2 = df.withColumn("timeStr", apply_my_udf(df.timeSec))

df2.show()

+---+-------+-----------+
| ID|timeSec|    timeStr|
+---+-------+-----------+
|  1|   1209|00:00:20:09|
|  2|  12155|00:03:22:35|
|  3|  91049|01:01:17:29|
+---+-------+-----------+

我认为这可以在没有 UDF 的情况下实现,并且对于大数据来说它会更快并且可扩展。试试这个,让我知道我的逻辑是否有漏洞。

from pyspark.sql import functions as F
from pyspark.sql.functions import when
df.withColumn("Minutes", F.round((F.col("Seconds")/60),2))\
.withColumn("Hours", F.floor((F.col("Minutes")/60)))\
.withColumn("hourmin", F.floor(F.col("Minutes")-(F.col("Hours").cast("int") * 60)))\
.withColumn("Days", F.floor((F.col("Hours")/24)))\
.withColumn("Days2", F.col("Days")*24)\
.withColumn("Time", F.when((F.col("Hours")==0) &(F.col("Days")==0), F.concat(F.col("hourmin"),F.lit("min"))).when((F.col("Hours")!=0)&(F.col("Days")==0), F.concat(F.col("Hours"),F.lit("hr "),F.col("hourmin"),F.lit("min"))).when(F.col("Days")!=0, F.concat(F.col("Days"),F.lit("d "),(F.col("Hours")-F.col("Days2")),F.lit("hr "),F.col("hourmin"),F.lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2")\
.show()


+-----------------+-------+---------------+
|             Name|Seconds|           Time|
+-----------------+-------+---------------+
| Enviar solicitud|   1209|          20min|
| Analizar mapa de|  12155|      3hr 22min|
|Entregar servicio|  91049|   1d 1hr 17min|
|         example1|   1900|          31min|
|         example2|   2500|          41min|
|         example3|9282398|107d 10hr 26min|
+-----------------+-------+---------------+

Spark 中没有内置函数,但可以在没有 UDF 的情况下完成。您可以简单地使用除法和取模运算来计算它以获得不同的部分(天,小时,......),并连接以获得所需的格式。

对于 Spark 2.4+,您可以使用高阶函数 zip_with and array_join。首先创建 parts 列,其中包含来自 Seconds 列的天数、小时数、分钟数和秒数。然后将其与单位文字数组 array('d', 'hr', 'min', 'sec') 压缩,以将每个部分与其单位连接起来,最后使用逗号分隔符连接所有元素。

duration_parts = [(86400, 7), (3600, 24), (60, 60), (1, 60)]
exp = "zip_with(parts, array('d', 'hr', 'min', 'sec'), (x, y) -> IF(x > 0, concat(x, y), null))"

df.withColumn("parts", array(*[(floor(col("Seconds") / d)) % m for d, m in duration_parts]))\
  .withColumn("duration", array_join(expr(exp), ", "))\
  .drop("parts")\
  .show(truncate=False)

#+--------------------------------------------+-------+---------------------+
#|Name                                        |Seconds|duration             |
#+--------------------------------------------+-------+---------------------+
#|Analizar mapa de comparacion de presupuestos|1209   |20min, 9sec          |
#|Crear mapa de comparacion de presupuestos   |12155  |3hr, 22min, 35sec    |
#|Entregar servicios de bienes                |91049  |1d, 1hr, 17min, 29sec|
#+--------------------------------------------+-------+---------------------+

另一种方法是使用 concat 并添加 when 表达式,如果您不想要等于 0 的部分:

df.withColumn("duration", concat(
            floor(col("Seconds") / 86400), lit("d, "),
            floor(col("Seconds") % 86400 / 3600), lit("hr, "),
            floor((col("Seconds") % 86400) % 3600 / 60), lit("min, "),
            floor(((col("Seconds") % 86400) % 3600) % 60), lit("sec "),
        )).show(truncate=False)