将 PySpark 的连续 withColumn 转换为 SQL

Converting PySpark's consecutive withColumn to SQL

我需要帮助将以下函数转换为 SQL 查询:

start_time :- 1649289600
end_time :- 1649375999

test_data =  df.withColumn("from_timestamp",to_timestamp(lit(from_unixtime(col("start_time"),'MM-dd-yyyy HH:mm:ss:SSS')), 'MM-dd-yyyy HH:mm:ss:SSS')) \
    .withColumn("to_timestamp",to_timestamp(lit(from_unixtime(col("end_time"),"MM-dd-yyyy HH:mm:ss:SSS")), 'MM-dd-yyyy HH:mm:ss:SSS')) \
    .withColumn("DiffInSeconds", col("from_timestamp").cast("long") - col("to_timestamp").cast("long")) \
    .withColumn("time_diff_hr", abs(ceil(col("DiffInSeconds")/3600))) \
    .withColumn("minDiffInHours", col("DiffInSeconds")/3600 - ceil(col("DiffInSeconds")/3600)) \
    .withColumn("time_diff_min",abs(ceil(col("minDiffInHours")*60))) \
    .withColumn("minDiffInSec", col("minDiffInHours")*60 - ceil(col("minDiffInHours")*60)) \
    .withColumn("time_diff_sec",abs(ceil(col("minDiffInSec")*60)))

我尝试了很多方法,例如:

 df2=sqlContext.sql("SELECT start,end,from_unixtime(cast(start as string), 'MM-dd-yyyy HH:mm:ss:SSS') AS start_date,from_unixtime(cast(end as string), 'MM-dd-yyyy HH:mm:ss:SSS') AS end_date from data1")
 df2.show()
 df2.createOrReplaceTempView('data2')                        
 df4=sqlContext.sql("SELECT start,end,start_date,end_date,(end_date-start_date) as time_diff from data2")

但每当我尝试找出差异时,它都会返回 null 个值。

编辑:

data1 的输出

+---+---+----+----------+----------+--------------------+--------------------+
|  a|  b|   c|     start|       end|          start_date|            end_date|
+---+---+----+----------+----------+--------------------+--------------------+
|  1|4.0|GFG1|1649289600|1649375999|04-07-2022 00:00:...|04-07-2022 23:59:...|
+---+---+----+----------+----------+--------------------+--------------------+

如果需求只是转换成时间戳。看下面的逻辑-

spark.sql("select timestamp(from_unixtime(1649289600,'yyyy-MM-dd HH:mm:ss')) as start_time, timestamp(from_unixtime(1649375999,'yyyy-MM-dd HH:mm:ss')) as end_time" ).show(truncate=False)

+-------------------+-------------------+
|start_time         |end_time           |
+-------------------+-------------------+
|2022-04-07 00:00:00|2022-04-07 23:59:59|
+-------------------+-------------------+

在 SQL 的 SELECT 中,您只能引用现有的列。 PySpark 的 withColumn 工作方式不同,因为它基本上创建了一个带有附加列的新 df,因此稍后您可以使用该列。由于在 SQL 中你不能,你必须从 startdate 列中创建每个新列:

df2 = sqlContext.sql(
    """
    select
        start,
        end,
        from_unixtime(start, 'yyyy-MM-dd HH:mm:ss') as from_timestamp,
        from_unixtime(end, 'yyyy-MM-dd HH:mm:ss') as to_timestamp,
        (start - end) as DiffInSeconds,
        abs(ceil((start - end) / 3600)) as time_diff_hr,
        ((start - end) / 3600) - ceil((start - end) / 3600) as minDiffInHours,
        abs(ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60)) as time_diff_min,
        (((start - end) / 3600) - ceil((start - end) / 3600)) * 60 - ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60) as minDiffInSec,
        abs(ceil(((((start - end) / 3600) - ceil((start - end) / 3600)) * 60 - ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60)) * 60)) as time_diff_sec
    from data1
    """
)
df2.show()
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
# |     start|       end|     from_timestamp|       to_timestamp|DiffInSeconds|time_diff_hr|     minDiffInHours|time_diff_min|       minDiffInSec|time_diff_sec|
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
# |1649289600|1649375999|2022-04-07 00:00:00|2022-04-07 23:59:59|       -86399|          23|-0.9997222222222213|           59|-0.9833333333332774|           58|
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+