Apache Beam 在无限侧输入时被阻塞
Apache beam blocked on unbounded side input
我的问题与另一个问题非常相似post:。
但是,我尝试了那里的解决方案(将 GlobalWindows() 应用于侧输入),它似乎没有解决我的问题。
我有一个带有 Python SDK 的数据流管道(但我正在使用 DirectRunner 进行调试),其中主要输入是来自 PubSub 的日志,副输入是来自几乎不变的数据库的关联数据。我想加入这两者,以便每个日志都与来自相同近似时间的侧输入数据配对。可以删除没有关联日志的多余输入。
我看到的行为是管道似乎作为单个线程运行。它首先处理所有侧输入元素,然后开始处理主要输入元素。如果侧输入是有界的(非流式),这很好,管道可以合并输入和 运行 完成。但是,如果侧输入是无界的(流式传输),则主输入将无限期阻塞,同时显然在等待侧输入处理完成。
为了说明行为,我在下面制作了简化的测试用例。
class Logger(apache_beam.DoFn):
def __init__(self, name):
self._name = name
def process(self, element, w=apache_beam.DoFn.WindowParam,
ts=apache_beam.DoFn.TimestampParam):
logging.error('%s: %s', self._name, element)
yield element
def cross_join(left, rights):
for right in rights:
yield (left, right)
def main():
start = timestamp.Timestamp.now()
# Bounded side inputs work OK.
stop = start + 20
# Unbounded side inputs appear to block execution of main input
# processing.
#stop = timestamp.MAX_TIMESTAMP
side_interval = 5
main_interval = 1
side_input = (
pipeline
| PeriodicImpulse(
start_timestamp=start,
stop_timestamp=stop,
fire_interval=side_interval,
apply_windowing=True)
| apache_beam.Map(lambda x: ('side', x))
| apache_beam.ParDo(Logger('side_input'))
)
main_input = (
pipeline
| PeriodicImpulse(
start_timestamp=start, stop_timestamp=stop,
fire_interval=main_interval, apply_windowing=True)
| apache_beam.Map(lambda x: ('main', x))
| apache_beam.ParDo(Logger('main_input'))
| 'CrossJoin' >> apache_beam.FlatMap(
cross_join, rights=apache_beam.pvalue.AsIter(side_input))
| 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
)
pipeline.run()
我遗漏了一些阻止主输入与副输入并行处理的东西吗?
主输入只有在水印通过相应的侧输入windowing时才能前进。请参阅 programming guide 中的详细信息。您可能需要 window 主输入和副输入,并确保 PeriodicImpulse 正确推进水印。
使用 中的示例,在某些情况下,我能够让侧输入和主输入同时工作。答案是将 GlobalWindows() 应用于 side_input.
side_input = ( pipeline
| PeriodicImpulse(fire_interval=300, apply_windowing=False)
| "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| ...
)
根据实验,我的结论是存在侧输入上的 PeriodicImpulse 导致主输入阻塞的情况,如下所示:
Case 1: GOOD
GlobalWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 2: BAD
FixedWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 3: BAD
GlobalWindow / FixedWindow
Main input = PeriodicImpulse
Side input = PeriodicImpulse
Case 4: GOOD
FixedWindow
Main input = PubSub
Side input = PubSub
我现在的问题是副输入时间戳没有与主输入正确对齐whosebug.com/q/72382440。
我的问题与另一个问题非常相似post:
但是,我尝试了那里的解决方案(将 GlobalWindows() 应用于侧输入),它似乎没有解决我的问题。
我有一个带有 Python SDK 的数据流管道(但我正在使用 DirectRunner 进行调试),其中主要输入是来自 PubSub 的日志,副输入是来自几乎不变的数据库的关联数据。我想加入这两者,以便每个日志都与来自相同近似时间的侧输入数据配对。可以删除没有关联日志的多余输入。
我看到的行为是管道似乎作为单个线程运行。它首先处理所有侧输入元素,然后开始处理主要输入元素。如果侧输入是有界的(非流式),这很好,管道可以合并输入和 运行 完成。但是,如果侧输入是无界的(流式传输),则主输入将无限期阻塞,同时显然在等待侧输入处理完成。
为了说明行为,我在下面制作了简化的测试用例。
class Logger(apache_beam.DoFn):
def __init__(self, name):
self._name = name
def process(self, element, w=apache_beam.DoFn.WindowParam,
ts=apache_beam.DoFn.TimestampParam):
logging.error('%s: %s', self._name, element)
yield element
def cross_join(left, rights):
for right in rights:
yield (left, right)
def main():
start = timestamp.Timestamp.now()
# Bounded side inputs work OK.
stop = start + 20
# Unbounded side inputs appear to block execution of main input
# processing.
#stop = timestamp.MAX_TIMESTAMP
side_interval = 5
main_interval = 1
side_input = (
pipeline
| PeriodicImpulse(
start_timestamp=start,
stop_timestamp=stop,
fire_interval=side_interval,
apply_windowing=True)
| apache_beam.Map(lambda x: ('side', x))
| apache_beam.ParDo(Logger('side_input'))
)
main_input = (
pipeline
| PeriodicImpulse(
start_timestamp=start, stop_timestamp=stop,
fire_interval=main_interval, apply_windowing=True)
| apache_beam.Map(lambda x: ('main', x))
| apache_beam.ParDo(Logger('main_input'))
| 'CrossJoin' >> apache_beam.FlatMap(
cross_join, rights=apache_beam.pvalue.AsIter(side_input))
| 'CrossJoinLogger' >> apache_beam.ParDo(Logger('cross_join_output'))
)
pipeline.run()
我遗漏了一些阻止主输入与副输入并行处理的东西吗?
主输入只有在水印通过相应的侧输入windowing时才能前进。请参阅 programming guide 中的详细信息。您可能需要 window 主输入和副输入,并确保 PeriodicImpulse 正确推进水印。
使用
side_input = ( pipeline
| PeriodicImpulse(fire_interval=300, apply_windowing=False)
| "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| ...
)
根据实验,我的结论是存在侧输入上的 PeriodicImpulse 导致主输入阻塞的情况,如下所示:
Case 1: GOOD
GlobalWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 2: BAD
FixedWindow
Main input = PubSub
Side input = PeriodicImpulse
Case 3: BAD
GlobalWindow / FixedWindow
Main input = PeriodicImpulse
Side input = PeriodicImpulse
Case 4: GOOD
FixedWindow
Main input = PubSub
Side input = PubSub
我现在的问题是副输入时间戳没有与主输入正确对齐whosebug.com/q/72382440。