跳过 Apache Beam 管道中的步骤 Python
Skipping step in an apache beam pipeline Python
所以我正在构建一个 apache beam 管道并且在跳过 python SDK 中的其余步骤时遇到了一些问题。这是一个我遇到问题的简化示例:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
result = (sub_message | 'foo' >> beam.Map(foo))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
所以根据这个:Java 中的 如果我的函数没有 return 任何东西,那么 apache_beam 应该跳过其余的步骤。如果我错了请纠正我,但在 python 中与 returning None 相同所以我的 pass
可以替换为 return None
并且是准确的相同的。但是,当我 运行 带有 pass
或 return None
的代码时,结果确实会进入下一步。也就是说,当它不应该打印任何东西时它会继续打印 None
,因为它应该跳过所有后续步骤。任何帮助表示赞赏。
有趣的是,我一发布这篇文章,就在文档中找到了答案。所以看起来在 link 中,我提供的等效项是像我一样使用 ParDo 而不是地图。所以实际上它应该是这样的:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
def process(self, element):
print('hi')
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
所以我正在构建一个 apache beam 管道并且在跳过 python SDK 中的其余步骤时遇到了一些问题。这是一个我遇到问题的简化示例:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
result = (sub_message | 'foo' >> beam.Map(foo))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()
所以根据这个:Java 中的 pass
可以替换为 return None
并且是准确的相同的。但是,当我 运行 带有 pass
或 return None
的代码时,结果确实会进入下一步。也就是说,当它不应该打印任何东西时它会继续打印 None
,因为它应该跳过所有后续步骤。任何帮助表示赞赏。
有趣的是,我一发布这篇文章,就在文档中找到了答案。所以看起来在 link 中,我提供的等效项是像我一样使用 ParDo 而不是地图。所以实际上它应该是这样的:
import apache_beam as beam
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
def process(self, element):
print('hi')
pass
options = {
'streaming': True
}
runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
result | 'print' >> beam.Map(print)
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()