Google Cloud Dataflow 自定义模板 - 仅在流式管道中

Google Cloud Dataflow Custom Template - only in streaming pipelines

我正在尝试为 Google 的数据流创建自定义模板。我只是想将一些消息从 Pubsub 打印到控制台。当我尝试暂存我的模板时,我收到一条错误消息,指出 Cloud Pub/Sub 仅可用于流式传输管道,而我的管道旨在成为流式传输管道 :x。 我在做什么使我的管道批处理而不是流处理?

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrintExample(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--welcome', type=str)


TOPIC = ...
PROJECT = ...
BUCKET = ...


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1'
)

with beam.Pipeline(options=pipeline_options) as p:
    options = pipeline_options.view_as(PrintExample)
    (
        p
        | "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
        | "Print" >> beam.Map(print)
    )
    p.run()

那我运行

python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate

导致:

ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.

你快到了。只需在您的命令中添加 --streaming

python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate
    --streaming

我看到您正在使用 PipelineOptions。你也可以传递 streaming=True.

pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1',
    streaming=True
)