Flink SQL 中跳跃 window 的指数衰减移动平均线:施法时间

An exponentially decaying moving average over a hopping window in Flink SQL: Casting time

现在我们在 Flink 中有了 SQL 和精美的窗口,我正在尝试从他们的 SQL roadmap/preview 2017-03 post:

中获得 "what will be possible in future Flink releases for both the Table API and SQL." 引用的衰减移动平均线
table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

这是我的尝试(也受到 the calcite decaying example 的启发):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

Time 是处理时间,我们通过从 AppendStream Table 创建 write_position 得到的 proctime 为:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

我收到这个错误:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

我已经尝试将 proctime 转换为我所知道的所有其他类型(试图到达 NUMERIC 应许之地),但我只是找不到如何让它工作。

我错过了什么吗? proctime 是一种您无法转换的非常特殊的 'system change number' 时间吗?如果是这样,仍然必须有一些方法可以将它与 HOP_START(proctime,...) 值进行比较。

您可以使用 timestampDiff 减去两个时间点(参见 docs)。你这样用

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

其中时间点单位可以是秒、分钟、小时、天、月或年。

我还没有尝试过处理时间,但它确实适用于事件时间字段,所以希望它会。