Google Cloud Dataflow 自定义模板 - 仅在流式管道中
Google Cloud Dataflow Custom Template - only in streaming pipelines
我正在尝试为 Google 的数据流创建自定义模板。我只是想将一些消息从 Pubsub 打印到控制台。当我尝试暂存我的模板时,我收到一条错误消息,指出 Cloud Pub/Sub 仅可用于流式传输管道,而我的管道旨在成为流式传输管道 :x。
我在做什么使我的管道批处理而不是流处理?
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrintExample(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--welcome', type=str)
TOPIC = ...
PROJECT = ...
BUCKET = ...
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1'
)
with beam.Pipeline(options=pipeline_options) as p:
options = pipeline_options.view_as(PrintExample)
(
p
| "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
| "Print" >> beam.Map(print)
)
p.run()
那我运行
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
导致:
ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.
你快到了。只需在您的命令中添加 --streaming
。
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
--streaming
我看到您正在使用 PipelineOptions。你也可以传递 streaming=True
.
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1',
streaming=True
)
我正在尝试为 Google 的数据流创建自定义模板。我只是想将一些消息从 Pubsub 打印到控制台。当我尝试暂存我的模板时,我收到一条错误消息,指出 Cloud Pub/Sub 仅可用于流式传输管道,而我的管道旨在成为流式传输管道 :x。 我在做什么使我的管道批处理而不是流处理?
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class PrintExample(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--welcome', type=str)
TOPIC = ...
PROJECT = ...
BUCKET = ...
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1'
)
with beam.Pipeline(options=pipeline_options) as p:
options = pipeline_options.view_as(PrintExample)
(
p
| "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
| "Print" >> beam.Map(print)
)
p.run()
那我运行
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
导致:
ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.
你快到了。只需在您的命令中添加 --streaming
。
python -m PrintTemplate.py
--runner DataflowRunner --project [PROJECT]
--staging_location gs://[BUCKET]/staging
--temp_location gs://[BUCKET]/temp
--template_location gs://[BUCKET]/templates/PrintTemplate
--streaming
我看到您正在使用 PipelineOptions。你也可以传递 streaming=True
.
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project=PROJECT,
job_name='printtemplate01',
temp_location='gs://{}/temp'.format(BUCKET),
region='us-central1',
streaming=True
)