apache_beam 管道是否需要使用 with 语句声明?

Do apache_beam pipelines need to be declared using a with statement?

我开始学习 apache beam,在尝试构建我的第一个管道时(在 python 中),我遇到了一个奇怪的行为。

这是我的代码的元素:

一个文件:count_words.txt 包含:

Strawberry
Carrot
Eggplant
Tomato
Potato

有效的代码版本:wordcount_exercise.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

input_file = "data/input/count_words.txt"

# 1 - Create pipelineOptions object to run the pipeline
beam_options = PipelineOptions(runner="DirectRunner")

# 2 - Apply transforms
# 2.1 - Read text from file
with beam.Pipeline(options=beam_options) as pipeline:
    lines = pipeline | beam.io.ReadFromText(input_file) | beam.Map(print)

输出:

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
Strawberry
Carrot
Eggplant
Tomato
Potato

一个代码版本不起作用:wordcount_exercise.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

input_file = "data/input/count_words.txt"

# 1 - Create pipelineOptions object to run the pipeline
beam_options = PipelineOptions(runner="DirectRunner")

# 2 - Create the pipeline using the pipelineOptions
pipeline = beam.Pipeline(options=beam_options)
# 3 - Apply transforms
# 3.1 - Read text from file
lines = pipeline | beam.io.ReadFromText(input_file) | beam.Map(print)

没有输出。

为什么需要在 with 语句中声明管道?为什么它不能使用简单的声明?

管道不需要 with 语句,但添加它就不需要手动 运行 管道 pipeline.run() (或 p.run() 在某些情况下您的管道变量是 p).

与使用 with 自动关闭文件 (f.close()) 打开文件类似,Pipeline()with 使用 p.run().

对于您发布的代码,只需在末尾添加 pipeline.run()