测试多次产生相同实例的步骤
Test a step that yields the same instance multiple times
我们有一个步骤可以在 Dataflow 的换行符上拆分 Pubsub 消息。我们有一个通过代码的测试,但它似乎在生产中失败了。看起来我们同时在管道的多个位置收到相同的 Pubsub 消息(至少据我所知)。
我们应该用另一种方式编写第一个测试吗?或者这只是关于在 Apache Beam 中不能做什么的惨痛教训?
import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
import unittest
class SplitUpBatches(beam.DoFn):
def process(self, msg):
bodies = msg.data.split('\n')
for body in bodies:
msg.data = body.strip()
yield msg
class TestSplitting(unittest.TestCase):
body = """
first
second
third
""".strip()
def test_incorrectly_passing(self):
"""Incorrectly passing"""
msg = PubsubMessage(self.body, {})
with TestPipeline() as p:
assert_that(
p
| beam.Create([msg])
| "split up batches" >> beam.ParDo(SplitUpBatches())
| "map to data" >> beam.Map(lambda m: m.data),
equal_to(['first', 'second', 'third']))
def test_correctly_failing(self):
"""Failing, but not using a TestPipeline"""
msg = PubsubMessage(self.body, {})
messages = list(SplitUpBatches().process(msg))
bodies = [m.data for m in messages]
self.assertEqual(bodies, ['first', 'second', 'third'])
# => AssertionError: ['third', 'third', 'third'] != ['first', 'second', 'third']
TL;DR: 是的,这是 不要在 Beam 中做的事情的一个例子:重新利用(变异)你的元素对象.
事实上,Beam 不鼓励改变你的转换的输入和输出,因为 Beam passes/buffers 如果你改变它们,这些对象会以各种方式受到影响。
这里的建议是为每个输出创建一个新的 PubsubMessage
实例。
详细解释
这是由于 Beam 序列化和传递数据的方式所致。
您可能知道 Beam 在单个 worker 中一起执行多个步骤——我们称之为阶段。您的管道执行如下操作:
read_data -> split_up_batches -> serialize all data -> perform assert
这个中间 serialize data
步骤是一个实现细节。原因是对于 Beam assert_that
我们将单个 PCollection 的所有数据收集到一台机器中,并执行断言(因此我们需要序列化所有元素并将它们发送到一台机器)。我们通过 GroupByKey 操作来做到这一点。
当 DirectRunner
接收到 PubsubMessage('first')
的第一个 yield
时,它会将其序列化并 立即将其传输到 GroupByKey - 所以您得到 'first', 'second', 'third'
结果 - 因为序列化会立即发生。
当 DataflowRunner
接收到 PubsubMessage('first')
的第一个 yield
时,它 对其进行缓冲 ,然后发送一批元素。您得到 'third', 'third', 'third'
结果,因为序列化发生在缓冲区传输之后 - 并且您的原始 PubsubMessage
实例已被覆盖。
我们有一个步骤可以在 Dataflow 的换行符上拆分 Pubsub 消息。我们有一个通过代码的测试,但它似乎在生产中失败了。看起来我们同时在管道的多个位置收到相同的 Pubsub 消息(至少据我所知)。
我们应该用另一种方式编写第一个测试吗?或者这只是关于在 Apache Beam 中不能做什么的惨痛教训?
import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
import unittest
class SplitUpBatches(beam.DoFn):
def process(self, msg):
bodies = msg.data.split('\n')
for body in bodies:
msg.data = body.strip()
yield msg
class TestSplitting(unittest.TestCase):
body = """
first
second
third
""".strip()
def test_incorrectly_passing(self):
"""Incorrectly passing"""
msg = PubsubMessage(self.body, {})
with TestPipeline() as p:
assert_that(
p
| beam.Create([msg])
| "split up batches" >> beam.ParDo(SplitUpBatches())
| "map to data" >> beam.Map(lambda m: m.data),
equal_to(['first', 'second', 'third']))
def test_correctly_failing(self):
"""Failing, but not using a TestPipeline"""
msg = PubsubMessage(self.body, {})
messages = list(SplitUpBatches().process(msg))
bodies = [m.data for m in messages]
self.assertEqual(bodies, ['first', 'second', 'third'])
# => AssertionError: ['third', 'third', 'third'] != ['first', 'second', 'third']
TL;DR: 是的,这是 不要在 Beam 中做的事情的一个例子:重新利用(变异)你的元素对象.
事实上,Beam 不鼓励改变你的转换的输入和输出,因为 Beam passes/buffers 如果你改变它们,这些对象会以各种方式受到影响。
这里的建议是为每个输出创建一个新的 PubsubMessage
实例。
详细解释
这是由于 Beam 序列化和传递数据的方式所致。
您可能知道 Beam 在单个 worker 中一起执行多个步骤——我们称之为阶段。您的管道执行如下操作:
read_data -> split_up_batches -> serialize all data -> perform assert
这个中间 serialize data
步骤是一个实现细节。原因是对于 Beam assert_that
我们将单个 PCollection 的所有数据收集到一台机器中,并执行断言(因此我们需要序列化所有元素并将它们发送到一台机器)。我们通过 GroupByKey 操作来做到这一点。
当 DirectRunner
接收到 PubsubMessage('first')
的第一个 yield
时,它会将其序列化并 立即将其传输到 GroupByKey - 所以您得到 'first', 'second', 'third'
结果 - 因为序列化会立即发生。
当 DataflowRunner
接收到 PubsubMessage('first')
的第一个 yield
时,它 对其进行缓冲 ,然后发送一批元素。您得到 'third', 'third', 'third'
结果,因为序列化发生在缓冲区传输之后 - 并且您的原始 PubsubMessage
实例已被覆盖。