使用窗口缓慢更新侧输入不起作用
Slowly updating side input using windowing does not work
我遵循了 Slowly updating side input using windowing example 的代码片段,但它没有打印任何内容。
它只运行 side_input 而不是整个。
为此,我使用 DirectRunner。
import apache_beam as beam
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
def cross_join(left, rights):
for x in rights:
yield left, x
if __name__ == '__main__':
data = list(range(1, 100))
pattern = 'pat'
main_interval = 10
side_interval = 5
pipeline = beam.Pipeline()
side_input = (
pipeline
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=side_interval, apply_windowing=True)
| 'MapToFileName' >> beam.Map(lambda x: pattern + str(x)))
main_input = (
pipeline
| 'MpImpulse' >> beam.Create(data)
| 'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(window.FixedWindows(main_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(cross_join, rights=beam.pvalue.AsIter(side_input))
| 'log' >> beam.Map(print))
res = pipeline.run()
res.wait_until_finish()
感谢和问候。
这是因为PythonFnApiRunner还不支持流模式。它正在尝试 运行 PeriodicImpulse
在执行管道的下一阶段之前完成。 BEAM-7514.
目前正在处理此问题
不幸的是,看起来可以在 Python 中进行流式传输的 BundleBasedDirectRunner
在 PeriodicImpulse
中也有问题。
您可以尝试使用“真实的”运行ner,例如Flink 在本地模式下,通过将 runner='Flink'
传递给您的管道创建。
另请注意,您的副输入也需要 windowed,副输入和主输入将按 window 排列。
我遵循了 Slowly updating side input using windowing example 的代码片段,但它没有打印任何内容。 它只运行 side_input 而不是整个。 为此,我使用 DirectRunner。
import apache_beam as beam
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
def cross_join(left, rights):
for x in rights:
yield left, x
if __name__ == '__main__':
data = list(range(1, 100))
pattern = 'pat'
main_interval = 10
side_interval = 5
pipeline = beam.Pipeline()
side_input = (
pipeline
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=side_interval, apply_windowing=True)
| 'MapToFileName' >> beam.Map(lambda x: pattern + str(x)))
main_input = (
pipeline
| 'MpImpulse' >> beam.Create(data)
| 'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(window.FixedWindows(main_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(cross_join, rights=beam.pvalue.AsIter(side_input))
| 'log' >> beam.Map(print))
res = pipeline.run()
res.wait_until_finish()
感谢和问候。
这是因为PythonFnApiRunner还不支持流模式。它正在尝试 运行 PeriodicImpulse
在执行管道的下一阶段之前完成。 BEAM-7514.
不幸的是,看起来可以在 Python 中进行流式传输的 BundleBasedDirectRunner
在 PeriodicImpulse
中也有问题。
您可以尝试使用“真实的”运行ner,例如Flink 在本地模式下,通过将 runner='Flink'
传递给您的管道创建。
另请注意,您的副输入也需要 windowed,副输入和主输入将按 window 排列。