在 Apache Beam Python SDK 中测试窗口

Testing Windowing in Apache Beam Python SDK

我正在尝试在使用 Python SDK 编写的 Apache Beam 2.35.0 管道中为延迟和乱序数据编写测试。通过阅读 Python 代码,我正在跟随 Testing Unbounded Pipelines in Apache Beam blog post and trying to map the Java examples 进入 Python。

这是我的第一次尝试。管道正在从消息中提取时间并重写消息上的时间。评论解释了我对发生的事情的理解:

import apache_beam as beam
import pandas as pd
from apache_beam.testing import test_stream
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


def test_out_of_order_data():
    stream = test_stream.TestStream()

    # First element arrives 15 after its "data" time.
    stream.advance_watermark_to(ts("12:00:45"))
    stream.add_elements([{"time": ts("12:00:30"), "value": 1}])

    # Next element 5 seconds after its data time and advances output watermark past the previous window.
    stream.advance_watermark_to(ts("13:01:05"))
    stream.add_elements([{"time": ts("13:01:01"), "value": 2}])

    # Late element from the first window arrives.
    stream.advance_watermark_to(ts("13:01:06"))
    stream.add_elements([{"time": ts("12:00:31"), "value": 3}])

    stream.advance_watermark_to_infinity()

    with TestPipeline() as p:
        output = (
            p
            | "stream" >> stream
            | "rewrite timestamps" >> beam.Map(lambda e: beam.window.TimestampedValue(e, e["time"]))
            | "window" >> beam.WindowInto(beam.window.FixedWindows(60))
            | "add dummy key" >> beam.Map(lambda elem: (None, elem))
            | "group" >> beam.GroupByKey()
            | "remove dummy key" >> beam.Map(lambda elem: elem[1])
            | "sum" >> beam.Map(lambda elems: sum([e["value"] for e in elems]))
        )

        # I expected the output watermark to be advanced by the data, the late element to be discarded,
        # and `output` to be [1, 2]. It is not. This test passes.
        assert_that(output, equal_to([4, 2]))

def ts(time_str):
    return pd.Timestamp(f"2000-01-01 {time_str}").timestamp()

这与我预期的不一样。 13:01:06 水印时间的延迟消息仍然有助于 12:00 window。

Java 示例有一些直接引用 windows 的断言。我认为这可能是我上面遗漏的,所以我尝试使用它编写一个最小测试:

def test_window_simple_with_global_false():
    stream = test_stream.TestStream()
    stream.advance_watermark_to(ts("12:00:30"))
    stream.add_elements([1])
    stream.advance_watermark_to_infinity()

    with TestPipeline() as p:
        output = (
            p
            | "stream" >> stream
            | "window" >> beam.WindowInto(beam.window.FixedWindows(60))
            | "add dummy key" >> beam.Map(lambda elem: (None, elem))
            | "group" >> beam.GroupByKey()
            | "remove dummy key" >> beam.Map(lambda elem: elem[1])
        )
        assert_that(
            output,
            equal_to({beam.window.IntervalWindow(ts("12:00"), ts("12:01")): [1]}),
            use_global_window=False,
        )

我尝试了一些变体(删除“添加虚拟 key/group/remove 虚拟键”步骤,添加 reify_windows=True),但看起来匹配器期待 window 和收到列表:

ERROR    apache_beam.runners.direct.executor:executor.py:379 Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f44cd345400>, due to an exception.
 Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 843, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/home/aschmied/code/cme-injestion/cme-real-time/.tox/unit/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1636, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/home/aschmied/code/cme-injestion/cme-real-time/.tox/unit/lib/python3.8/site-packages/apache_beam/testing/util.py", line 164, in _equal
    sorted_actual = sorted(actual)
  File "/home/aschmied/code/cme-injestion/cme-real-time/.tox/unit/lib/python3.8/site-packages/apache_beam/transforms/window.py", line 232, in __lt__
    if self.end != other.end:
AttributeError: 'list' object has no attribute 'end'

非常感谢任何帮助。

对于您的第一个问题,不能保证会删除“迟到”的数据,而是允许运行器删除晚于此的数据。 (实际上,在分布式运行器中,数据是否延迟以及延迟多少通常是 non-deterministic 时间问题。)我同意如果本地运行器强制执行此操作会更好。

至于使用 reify_windows=True 进行测试,我发现使用

这样的操作要容易得多
assert_that(
  actual | beam.Map(lambda x, w=beam.DoFn.WindowParam: (x, w))
  equal_to([(some_value, some_window)])
)