如何在 DataFlow 作业中跨运行器跟踪状态?

How do I track states across runners in a DataFlow Job?

我目前正在创建一个流式数据流作业,该作业仅当且仅当我的数据的“环”列中有增量时才执行计算。

我的数据流代码

Job=      (p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
         | "Parse Json" >> beam.Map(json.loads)
         | "ParDo Divisors" >> ParDo(UpdateDelayTable()))

从 pubsub 流入的数据:

Ring [
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":2}
...]

我希望我的数据流跟踪当前的环号,并且仅当且仅当环号增加时才触发函数。我应该怎么做。

Pub/Sub

不能保证{"Ring": 2}{"Ring": 1}之后一定会被Pub/Subreceived/sent。

看来您必须为 Pub/Sub 启用 receiving messages in order first。还要确保 Pub/Sub 服务以增量方式接收 Ring 数据。

数据流

然后用Dataflow实现,可以用stateful processing.

但请注意,“Ring”的“状态”是按按键(和按 window)确定的。要执行您想要的操作,所有元素都需要具有相同的键并落入相同的 window(在本例中为全局 window)。这将是一个非常“热门”的关键。

示例代码:

from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.coders import coders


class RingFn(beam.DoFn):
  RING_STATE = ReadModifyWriteStateSpec(
      name='Ring', coder=coders.VarIntCoder())

  def process(self, element, ring=beam.DoFn.StateParam(RING_STATE)):
    current_ring = ring.read() or 0
    if element['Ring'] > current_ring:
        print('Carry out your computation here!')
        ring.write(element['Ring'])


  # Usage
  pcoll | beam.ParDo(RingFn())

  # Check your keys if you are not sure what they are.
  pcoll | beam.Keys() | beam.Map(print)