测试多次产生相同实例的步骤

Test a step that yields the same instance multiple times

我们有一个步骤可以在 Dataflow 的换行符上拆分 Pubsub 消息。我们有一个通过代码的测试,但它似乎在生产中失败了。看起来我们同时在管道的多个位置收到相同的 Pubsub 消息(至少据我所知)。

我们应该用另一种方式编写第一个测试吗?或者这只是关于在 Apache Beam 中不能做什么的惨痛教训?

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
import unittest

class SplitUpBatches(beam.DoFn):
    def process(self, msg):
        bodies = msg.data.split('\n')
        for body in bodies:
            msg.data = body.strip()
            yield msg

class TestSplitting(unittest.TestCase):
    body = """
    first
    second
    third
    """.strip()

    def test_incorrectly_passing(self):
        """Incorrectly passing"""
        msg = PubsubMessage(self.body, {})
        with TestPipeline() as p:
            assert_that(
                p
                | beam.Create([msg])
                | "split up batches" >> beam.ParDo(SplitUpBatches())
                | "map to data" >> beam.Map(lambda m: m.data),
                equal_to(['first', 'second', 'third']))

    def test_correctly_failing(self):
        """Failing, but not using a TestPipeline"""
        msg = PubsubMessage(self.body, {})
        messages = list(SplitUpBatches().process(msg))
        bodies = [m.data for m in messages]
        self.assertEqual(bodies, ['first', 'second', 'third'])
        # => AssertionError: ['third', 'third', 'third'] != ['first', 'second', 'third']

TL;DR: 是的,这是 不要在 Beam 中做的事情的一个例子:重新利用(变异)你的元素对象.

事实上,Beam 不鼓励改变你的转换的输入和输出,因为 Beam passes/buffers 如果你改变它们,这些对象会以各种方式受到影响。

这里的建议是为每个输出创建一个新的 PubsubMessage 实例。


详细解释

这是由于 Beam 序列化和传递数据的方式所致。

您可能知道 Beam 在单个 worker 中一起执行多个步骤——我们称之为阶段。您的管道执行如下操作:

read_data -> split_up_batches -> serialize all data -> perform assert

这个中间 serialize data 步骤是一个实现细节。原因是对于 Beam assert_that 我们将单个 PCollection 的所有数据收集到一台机器中,并执行断言(因此我们需要序列化所有元素并将它们发送到一台机器)。我们通过 GroupByKey 操作来做到这一点。

DirectRunner 接收到 PubsubMessage('first') 的第一个 yield 时,它会将其序列化并 立即将其传输到 GroupByKey - 所以您得到 'first', 'second', 'third' 结果 - 因为序列化会立即发生。

DataflowRunner 接收到 PubsubMessage('first') 的第一个 yield 时,它 对其进行缓冲 ,然后发送一批元素。您得到 'third', 'third', 'third' 结果,因为序列化发生在缓冲区传输之后 - 并且您的原始 PubsubMessage 实例已被覆盖。