数据流:在事件流中查找之前的事件

Dataflow: Look up a previous event in an event stream

在 Google 数据流中恢复我正在寻找的 Apache Beam 与 A​​zure 流分析

中的 LAG 类似

使用 window X 分钟接收数据:

||||||  ||||||  ||||||  ||||||  ||||||  ||||||
|  1 |  |  2 |  |  3 |  |  4 |  |  5 |  |  6 | 
|id=x|  |id=x|  |id=x|  |id=x|  |id=x|  |id=x| 
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...

我需要比较数据(n)和数据(n-1),例如,按照前面的例子,它会是这样的:

if data(6) inside and data(5)  outside then ... 
if data(5) inside and data(4)  outside then ... 
if data(4) inside and data(3)  outside then ... 
if data(3) inside and data(2)  outside then ... 
if data(2) inside and data(1)  outside then ... 

有什么"practical "方法可以做到这一点吗?

使用 Beam,如 docs 中所述,状态是按键和 window 维护的。因此,您无法访问之前 windows.

的值

要完成您想做的事情,您可能需要更复杂的流水线设计。我的想法,在这里作为示例开发,将在 ParDo 中复制您的消息:

  • 将它们原封不动地发送到主输出
  • 同时,将它们发送到具有一window滞后
  • 的侧输出

为了做第二个要点,我们可以将window(WINDOW_SECONDS)的持续时间添加到元素时间戳:

class DuplicateWithLagDoFn(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    # Main output gets unmodified element
    yield element
    # The same element is emitted to the side output with a 1-window lag added to timestamp
    yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))

我们调用指定正确标签的函数:

beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')

然后对两者应用相同的 windowing 方案,按键共同分组等

windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))

merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()

最后,我们可以在同一个 ParDo 中同时拥有两个值(旧值和新值):

class CompareDoFn(beam.DoFn):

  def process(self, element):
    logging.info("Combined with previous vale: {}".format(element))

    try:
      old_value = int(element[1][1][0].split(',')[1])
    except:
      old_value = 0

    try:
      new_value = int(element[1][0][0].split(',')[1])
    except:
      new_value = 0

    logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
    return (element[0], new_value - old_value)

为了测试这一点,我 运行 使用直接 运行ner 的管道,并且在单独的 shell 上,我发布了两条间隔超过 10 秒的消息(在我的例子中 WINDOW_SECONDS 是 10s):

gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"

并且作业输出显示了预期的差异:

INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40

我的示例的完整代码 here。在复制元素时考虑性能因素,但如果您需要在两个 windows.

期间提供可用值,这是有意义的