Apache Beam 在无限侧输入时被阻塞

Apache beam blocked on unbounded side input

我的问题与另一个问题非常相似post:

但是,我尝试了那里的解决方案(将 GlobalWindows() 应用于侧输入),它似乎没有解决我的问题。

我有一个带有 Python SDK 的数据流管道(但我正在使用 DirectRunner 进行调试),其中主要输入是来自 PubSub 的日志,副输入是来自几乎不变的数据库的关联数据。我想加入这两者,以便每个日志都与来自相同近似时间的侧输入数据配对。可以删除没有关联日志的多余输入。

我看到的行为是管道似乎作为单个线程运行。它首先处理所有侧输入元素,然后开始处理主要输入元素。如果侧输入是有界的(非流式),这很好,管道可以合并输入和 运行 完成。但是,如果侧输入是无界的(流式传输),则主输入将无限期阻塞,同时显然在等待侧输入处理完成。

为了说明行为,我在下面制作了简化的测试用例。

class Logger(apache_beam.DoFn):

  def __init__(self, name):
    self._name = name

  def process(self, element, w=apache_beam.DoFn.WindowParam,
              ts=apache_beam.DoFn.TimestampParam):
    logging.error('%s: %s', self._name, element)
    yield element

def cross_join(left, rights):
  for right in rights:
    yield (left, right)

def main():
  start = timestamp.Timestamp.now()

  # Bounded side inputs work OK.
  stop = start + 20

  # Unbounded side inputs appear to block execution of main input
  # processing.
  #stop = timestamp.MAX_TIMESTAMP

  side_interval = 5
  main_interval = 1

  side_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start,
          stop_timestamp=stop,
          fire_interval=side_interval,
          apply_windowing=True)
      | apache_beam.Map(lambda x: ('side', x))
      | apache_beam.ParDo(Logger('side_input'))
  )
  main_input = (
      pipeline
      | PeriodicImpulse(
          start_timestamp=start, stop_timestamp=stop,
          fire_interval=main_interval, apply_windowing=True)
      | apache_beam.Map(lambda x: ('main', x))
      | apache_beam.ParDo(Logger('main_input'))
      | 'CrossJoin' >> apache_beam.FlatMap(
          cross_join, rights=apache_beam.pvalue.AsIter(side_input))
      | 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
  )
  pipeline.run()

我遗漏了一些阻止主输入与副输入并行处理的东西吗?

主输入只有在水印通过相应的侧输入windowing时才能前进。请参阅 programming guide 中的详细信息。您可能需要 window 主输入和副输入,并确保 PeriodicImpulse 正确推进水印。

使用 中的示例,在某些情况下,我能够让侧输入和主输入同时工作。答案是将 GlobalWindows() 应用于 side_input.

side_input = ( pipeline
  | PeriodicImpulse(fire_interval=300, apply_windowing=False)
  | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
      trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
      accumulation_mode=trigger.AccumulationMode.DISCARDING)
  | ...
)

根据实验,我的结论是存在侧输入上的 PeriodicImpulse 导致主输入阻塞的情况,如下所示:

Case 1: GOOD
GlobalWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 2: BAD
FixedWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 3: BAD
GlobalWindow / FixedWindow
Main input = PeriodicImpulse
Side input = PeriodicImpulse
Case 4: GOOD
FixedWindow
Main input = PubSub
Side input = PubSub

我现在的问题是副输入时间戳没有与主输入正确对齐whosebug.com/q/72382440