在查询结果中插入额外的行 sql

insert extra rows in query result sql

给定 table 条目的时间戳不规则,必须以 5 分钟的固定间隔插入“中断”(关联的数据可以/将是 NULL)。

我正在考虑获取开始时间,创建一个具有 window 函数的子查询,并将 5 分钟的间隔添加到开始时间 - 但我只能想到使用 row_number 来递增值。

WITH data as(
select id, data,
cast(date_and_time as double) * 1000 as time_milliseconds
from t1), -- original data

start_times as(
select id, MIN(CAST(date_and_time as double) * 1000) as start_time
from t1
GROUP BY id
), -- first timestamp for each id

boundries as (
SELECT T1.id,(row_number() OVER (PARTITION BY T1.id ORDER BY T1.date_and_time)-1) *300000 + start_times.start_time
as boundry
from T1
INNER JOIN start_times ON start_times.id= T1.id
) -- increment the number of 5 min added on each row and later full join boundries table with original data

然而,这限制了我在原始数据 table 中出现的 id 的行数,并且如果时间戳被分散,行数不能覆盖需要的 5 分钟间隔被添加。

示例数据:

initial data:

 |-----------|------------------|------------------|
 |   id      |     value        |    timestamp     |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01.011  | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30.041  |
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20.231  |
 |-----------|------------------|------------------|
 |     1     |    3             |    15:00:00.312  |

data after my query:

 |-----------|------------------|------------------|
 |   id      |     value        | timestamp (UNIX) |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01      | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:05:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:10:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:15:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:20:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|  <-- Jumping directly to 15:00:00 (WRONG! :( need to insert more 5 min breaks here )
 |     1     |    3             |    15:00:00      |  



我正在考虑在 HIVE 中创建一个临时 table 并用 x 行填充它,代表从数据开始时间到结束时间的 5 分钟间隔 table,但我找不到任何实现它的方法。

有什么使用“for 循环”的方法吗?如有任何建议,我们将不胜感激。

谢谢

您可以尝试计算当前时间戳与下一个时间戳之间的差异,除以 300 得到范围数,生成长度为 num_ranges 的空格字符串,分解生成行。

演示:

with your_table as (--initial data example
select stack (3,
1,3 ,'2020-01-01 12:00:01.011', 
1,4 ,'2020-01-01 12:03:30.041',
1,5 ,'2020-01-01 12:20:20.231' 
) as (id ,value ,ts )
)


select id ,value, ts, next_ts,
        diff_sec,num_intervals,
       from_unixtime(unix_timestamp(ts)+h.i*300) new_ts, coalesce(from_unixtime(unix_timestamp(ts)+h.i*300),ts) as calculated_timestamp
from
(
 select id ,value ,ts, next_ts, (unix_timestamp(next_ts)-unix_timestamp(ts))  diff_sec,  
 floor((unix_timestamp(next_ts)-unix_timestamp(ts))/300 --diff in seconds/5 min
                                         ) num_intervals
from
(  
select id ,value ,ts, lead(ts) over(order by ts) next_ts
  from your_table
) s
)s
  lateral view outer posexplode(split(space(cast(s.num_intervals as int)),' ')) h as i,x --this will generate rows

结果:

id  value   ts                      next_ts                 diff_sec    num_intervals   new_ts              calculated_timestamp
1   3       2020-01-01 12:00:01.011 2020-01-01 12:03:30.041 209          0              2020-01-01 12:00:01 2020-01-01 12:00:01
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:03:30 2020-01-01 12:03:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:08:30 2020-01-01 12:08:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:13:30 2020-01-01 12:13:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:18:30 2020-01-01 12:18:30
1   5       2020-01-01 12:20:20.231 \N                      \N           \N             \N                  2020-01-01 12:20:20.231

添加了其他行。为了调试目的,我保留了所有中间列。

递归查询在这里可能会有帮助,但 Hive 不支持这些 more info

您可以考虑在 hive 外部创建 table 或编写 UDF。

无论哪种方式,此查询都可能很昂贵,建议根据您的频率使用物化 views/tables。

该示例显示了使用 pyspark 创建的 UDF inbetween 到 运行 查询。它

  1. 从数据集中生成介于最小和最大时间戳之间的值
  2. 使用 CTEsUDF 创建临时 table intervals
  3. possible_records
  4. 中使用昂贵的交叉连接生成所有可能的区间
  5. 使用左联接检索具有实际值的记录(出于演示目的,我将时间戳值表示为时间字符串)

下面的代码显示了如何使用 hive 对其进行评估

示例代码


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,ArrayType

inbetween = lambda min_value,max_value :  [*range(min_value,max_value,5*60)]

udf_inbetween = udf(inbetween,ArrayType(IntegerType()))
sqlContext.udf.register("inbetween",udf_inbetween)

sqlContext.sql("""
WITH max_timestamp(t) as (
  select max(timestamp) as t from initial_data2
),
min_timestamp(t) as (
  select min(timestamp) as t from initial_data2
),
intervals as (
   select explode(inbetween(unix_timestamp(mint.t),unix_timestamp(maxt.t))) as interval_time FROM
   min_timestamp mint, max_timestamp maxt
),
unique_ids as (
  select distinct id from initial_data2
),
interval_times as (
   select interval_time from (
   select 
       cast(from_unixtime(interval_time) as timestamp) as interval_time 
   from 
       intervals
   UNION
   select distinct d.timestamp as interval_time from initial_data2 d
   )
   order by interval_time asc
),
possible_records as (
   select
      distinct 
      d.id,
      i.interval_time
   FROM
      interval_times i, unique_ids d
   
)
select 
    p.id,
    d.value,
    split(cast(p.interval_time as string)," ")[1] as timestamp
FROM
  possible_records p
LEFT JOIN
   initial_data2 d ON d.id = p.id and d.timestamp = p.interval_time

ORDER BY p.id, p.interval_time
""").show(20)


输出

+---+-----+---------+
| id|value|timestamp|
+---+-----+---------+
|  1|    3| 12:00:01|
|  1|    4| 12:03:30|
|  1| null| 12:05:01|
|  1| null| 12:10:01|
|  1|    5| 12:12:20|
|  1| null| 12:15:01|
|  1| null| 12:20:01|
|  1| null| 12:25:01|
|  1| null| 12:30:01|
|  1| null| 12:35:01|
|  1| null| 12:40:01|
|  1| null| 12:45:01|
|  1| null| 12:50:01|
|  1| null| 12:55:01|
|  1| null| 13:00:01|
|  1| null| 13:05:01|
|  1| null| 13:10:01|
|  1| null| 13:15:01|
|  1| null| 13:20:01|
|  1| null| 13:25:01|
+---+-----+---------+

只显示前 20 行

要复制的数据准备


raw_data1 = [
    {"id":1,"value":3,"timestam":"12:00:01"},
    {"id":1,"value":4,"timestam":"12:03:30"},
    {"id":1,"value":5,"timestam":"12:12:20"},
    {"id":1,"value":3,"timestam":"15:00:00"},
]
raw_data = [*map(lambda entry : Row(**entry),raw_data1)]

initial_data = sqlContext.createDataFrame(raw_data,schema="id int, value int, timestam string ")
initial_data.createOrReplaceTempView('initial_data')

sqlContext.sql("create or replace temp view initial_data2 as select id,value,cast(timestam as timestamp) as timestamp from initial_data")