使用窗口缓慢更新侧输入不起作用

Slowly updating side input using windowing does not work

我遵循了 Slowly updating side input using windowing example 的代码片段,但它没有打印任何内容。 它只运行 side_input 而不是整个。 为此,我使用 DirectRunner。

import apache_beam as beam
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window

def cross_join(left, rights):
    for x in rights:
        yield left, x

if __name__ == '__main__':
    data = list(range(1, 100))
    pattern = 'pat'
    main_interval = 10
    side_interval = 5
    pipeline = beam.Pipeline()
    side_input = (
            pipeline
            | 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=side_interval, apply_windowing=True)
            | 'MapToFileName' >> beam.Map(lambda x: pattern + str(x)))
    main_input = (
            pipeline
            | 'MpImpulse' >> beam.Create(data)
            | 'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
            | 'WindowMpInto' >> beam.WindowInto(window.FixedWindows(main_interval)))
    result = (
            main_input
            | 'ApplyCrossJoin' >> beam.FlatMap(cross_join, rights=beam.pvalue.AsIter(side_input))
            | 'log' >> beam.Map(print))
    res = pipeline.run()
    res.wait_until_finish()

感谢和问候。

这是因为PythonFnApiRunner还不支持流模式。它正在尝试 运行 PeriodicImpulse 在执行管道的下一阶段之前完成。 BEAM-7514.

目前正在处理此问题

不幸的是,看起来可以在 Python 中进行流式传输的 BundleBasedDirectRunnerPeriodicImpulse 中也有问题。

您可以尝试使用“真实的”运行ner,例如Flink 在本地模式下,通过将 runner='Flink' 传递给您的管道创建。

另请注意,您的副输入也需要 windowed,副输入和主输入将按 window 排列。