将 PySpark 的连续 withColumn 转换为 SQL
Converting PySpark's consecutive withColumn to SQL
我需要帮助将以下函数转换为 SQL 查询:
start_time :- 1649289600
end_time :- 1649375999
test_data = df.withColumn("from_timestamp",to_timestamp(lit(from_unixtime(col("start_time"),'MM-dd-yyyy HH:mm:ss:SSS')), 'MM-dd-yyyy HH:mm:ss:SSS')) \
.withColumn("to_timestamp",to_timestamp(lit(from_unixtime(col("end_time"),"MM-dd-yyyy HH:mm:ss:SSS")), 'MM-dd-yyyy HH:mm:ss:SSS')) \
.withColumn("DiffInSeconds", col("from_timestamp").cast("long") - col("to_timestamp").cast("long")) \
.withColumn("time_diff_hr", abs(ceil(col("DiffInSeconds")/3600))) \
.withColumn("minDiffInHours", col("DiffInSeconds")/3600 - ceil(col("DiffInSeconds")/3600)) \
.withColumn("time_diff_min",abs(ceil(col("minDiffInHours")*60))) \
.withColumn("minDiffInSec", col("minDiffInHours")*60 - ceil(col("minDiffInHours")*60)) \
.withColumn("time_diff_sec",abs(ceil(col("minDiffInSec")*60)))
我尝试了很多方法,例如:
df2=sqlContext.sql("SELECT start,end,from_unixtime(cast(start as string), 'MM-dd-yyyy HH:mm:ss:SSS') AS start_date,from_unixtime(cast(end as string), 'MM-dd-yyyy HH:mm:ss:SSS') AS end_date from data1")
df2.show()
df2.createOrReplaceTempView('data2')
df4=sqlContext.sql("SELECT start,end,start_date,end_date,(end_date-start_date) as time_diff from data2")
但每当我尝试找出差异时,它都会返回 null
个值。
编辑:
data1 的输出
+---+---+----+----------+----------+--------------------+--------------------+
| a| b| c| start| end| start_date| end_date|
+---+---+----+----------+----------+--------------------+--------------------+
| 1|4.0|GFG1|1649289600|1649375999|04-07-2022 00:00:...|04-07-2022 23:59:...|
+---+---+----+----------+----------+--------------------+--------------------+
如果需求只是转换成时间戳。看下面的逻辑-
spark.sql("select timestamp(from_unixtime(1649289600,'yyyy-MM-dd HH:mm:ss')) as start_time, timestamp(from_unixtime(1649375999,'yyyy-MM-dd HH:mm:ss')) as end_time" ).show(truncate=False)
+-------------------+-------------------+
|start_time |end_time |
+-------------------+-------------------+
|2022-04-07 00:00:00|2022-04-07 23:59:59|
+-------------------+-------------------+
在 SQL 的 SELECT
中,您只能引用现有的列。 PySpark 的 withColumn
工作方式不同,因为它基本上创建了一个带有附加列的新 df,因此稍后您可以使用该列。由于在 SQL 中你不能,你必须从 start
和 date
列中创建每个新列:
df2 = sqlContext.sql(
"""
select
start,
end,
from_unixtime(start, 'yyyy-MM-dd HH:mm:ss') as from_timestamp,
from_unixtime(end, 'yyyy-MM-dd HH:mm:ss') as to_timestamp,
(start - end) as DiffInSeconds,
abs(ceil((start - end) / 3600)) as time_diff_hr,
((start - end) / 3600) - ceil((start - end) / 3600) as minDiffInHours,
abs(ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60)) as time_diff_min,
(((start - end) / 3600) - ceil((start - end) / 3600)) * 60 - ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60) as minDiffInSec,
abs(ceil(((((start - end) / 3600) - ceil((start - end) / 3600)) * 60 - ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60)) * 60)) as time_diff_sec
from data1
"""
)
df2.show()
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
# | start| end| from_timestamp| to_timestamp|DiffInSeconds|time_diff_hr| minDiffInHours|time_diff_min| minDiffInSec|time_diff_sec|
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
# |1649289600|1649375999|2022-04-07 00:00:00|2022-04-07 23:59:59| -86399| 23|-0.9997222222222213| 59|-0.9833333333332774| 58|
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
我需要帮助将以下函数转换为 SQL 查询:
start_time :- 1649289600
end_time :- 1649375999
test_data = df.withColumn("from_timestamp",to_timestamp(lit(from_unixtime(col("start_time"),'MM-dd-yyyy HH:mm:ss:SSS')), 'MM-dd-yyyy HH:mm:ss:SSS')) \
.withColumn("to_timestamp",to_timestamp(lit(from_unixtime(col("end_time"),"MM-dd-yyyy HH:mm:ss:SSS")), 'MM-dd-yyyy HH:mm:ss:SSS')) \
.withColumn("DiffInSeconds", col("from_timestamp").cast("long") - col("to_timestamp").cast("long")) \
.withColumn("time_diff_hr", abs(ceil(col("DiffInSeconds")/3600))) \
.withColumn("minDiffInHours", col("DiffInSeconds")/3600 - ceil(col("DiffInSeconds")/3600)) \
.withColumn("time_diff_min",abs(ceil(col("minDiffInHours")*60))) \
.withColumn("minDiffInSec", col("minDiffInHours")*60 - ceil(col("minDiffInHours")*60)) \
.withColumn("time_diff_sec",abs(ceil(col("minDiffInSec")*60)))
我尝试了很多方法,例如:
df2=sqlContext.sql("SELECT start,end,from_unixtime(cast(start as string), 'MM-dd-yyyy HH:mm:ss:SSS') AS start_date,from_unixtime(cast(end as string), 'MM-dd-yyyy HH:mm:ss:SSS') AS end_date from data1")
df2.show()
df2.createOrReplaceTempView('data2')
df4=sqlContext.sql("SELECT start,end,start_date,end_date,(end_date-start_date) as time_diff from data2")
但每当我尝试找出差异时,它都会返回 null
个值。
编辑:
data1 的输出
+---+---+----+----------+----------+--------------------+--------------------+
| a| b| c| start| end| start_date| end_date|
+---+---+----+----------+----------+--------------------+--------------------+
| 1|4.0|GFG1|1649289600|1649375999|04-07-2022 00:00:...|04-07-2022 23:59:...|
+---+---+----+----------+----------+--------------------+--------------------+
如果需求只是转换成时间戳。看下面的逻辑-
spark.sql("select timestamp(from_unixtime(1649289600,'yyyy-MM-dd HH:mm:ss')) as start_time, timestamp(from_unixtime(1649375999,'yyyy-MM-dd HH:mm:ss')) as end_time" ).show(truncate=False)
+-------------------+-------------------+
|start_time |end_time |
+-------------------+-------------------+
|2022-04-07 00:00:00|2022-04-07 23:59:59|
+-------------------+-------------------+
在 SQL 的 SELECT
中,您只能引用现有的列。 PySpark 的 withColumn
工作方式不同,因为它基本上创建了一个带有附加列的新 df,因此稍后您可以使用该列。由于在 SQL 中你不能,你必须从 start
和 date
列中创建每个新列:
df2 = sqlContext.sql(
"""
select
start,
end,
from_unixtime(start, 'yyyy-MM-dd HH:mm:ss') as from_timestamp,
from_unixtime(end, 'yyyy-MM-dd HH:mm:ss') as to_timestamp,
(start - end) as DiffInSeconds,
abs(ceil((start - end) / 3600)) as time_diff_hr,
((start - end) / 3600) - ceil((start - end) / 3600) as minDiffInHours,
abs(ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60)) as time_diff_min,
(((start - end) / 3600) - ceil((start - end) / 3600)) * 60 - ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60) as minDiffInSec,
abs(ceil(((((start - end) / 3600) - ceil((start - end) / 3600)) * 60 - ceil((((start - end) / 3600) - ceil((start - end) / 3600)) * 60)) * 60)) as time_diff_sec
from data1
"""
)
df2.show()
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
# | start| end| from_timestamp| to_timestamp|DiffInSeconds|time_diff_hr| minDiffInHours|time_diff_min| minDiffInSec|time_diff_sec|
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+
# |1649289600|1649375999|2022-04-07 00:00:00|2022-04-07 23:59:59| -86399| 23|-0.9997222222222213| 59|-0.9833333333332774| 58|
# +----------+----------+-------------------+-------------------+-------------+------------+-------------------+-------------+-------------------+-------------+