创建 Google 数据流模板文件时出错
Error when creating Google Dataflow template file
我正在尝试使用模板安排在一定时间后结束的数据流。使用命令行时我能够成功执行此操作,但是当我尝试使用 Google Cloud Scheduler 执行此操作时,我 运行 在创建模板时遇到错误。
错误是
File "pipelin_stream.py", line 37, in <module>
main()
File "pipelin_stream.py", line 34, in main
result.cancel()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1638, in cancel
raise IOError('Failed to get the Dataflow job id.')
IOError: Failed to get the Dataflow job id.
我用来制作模板的命令是
python pipelin_stream.py \
--runner Dataflowrunner \
--project $PROJECT \
--temp_location $BUCKET/tmp \
--staging_location $BUCKET/staging \
--template_location $BUCKET/templates/time_template_test \
--streaming
我的管道文件是这个
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
PROJECT = 'projectID'
schema = 'ex1:DATE, ex2:STRING'
TOPIC = "projects/topic-name/topics/scraping-test"
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(region='us-central1', service_account_email='email'))
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x:x.decode('utf-8'))
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('tablename'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish(duration=3000)
result.cancel() # If the pipeline has not finished, you can cancel it
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
有谁知道为什么我会收到此错误?
等待时间过后 cancel function 引发错误,似乎无害。
为了证明这一点,我设法从我的 python 3.5 虚拟机中重现了您的确切问题。该模板由 --template_location
在给定路径中创建,可用于 运行 作业。请注意,我需要对您的代码进行一些更改才能使其在 Dataflow 中实际工作。
如果它对你有用,我最终使用了这个管道代码
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import datetime
# Fill this values in order to have them by default
# Note that the table in BQ needs to have the column names message_body and publish_time
Table = 'projectid:datasetid.tableid'
schema = 'ex1:STRING, ex2:TIMESTAMP'
TOPIC = "projects/<projectid>/topics/<topicname>"
class AddTimestamps(beam.DoFn):
def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""Processes each incoming element by extracting the Pub/Sub
message and its publish timestamp into a dictionary. `publish_time`
defaults to the publish timestamp returned by the Pub/Sub server. It
is bound to each element by Beam at runtime.
"""
yield {
"message_body": element.decode("utf-8"),
"publish_time": datetime.datetime.utcfromtimestamp(
float(publish_time)
).strftime("%Y-%m-%d %H:%M:%S.%f"),
}
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic", default=TOPIC)
parser.add_argument("--output_table", default=Table)
args, beam_args = parser.parse_known_args(argv)
# save_main_session needs to be set to true due to modules being used among the code (mostly datetime)
# Uncomment the service account email to specify a custom service account
p = beam.Pipeline(argv=beam_args,options=PipelineOptions(save_main_session=True,
region='us-central1'))#, service_account_email='email'))
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=args.input_topic).with_output_types(bytes)
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(args.output_table, schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
#Warning: Cancel does not work properly in a template
result.wait_until_finish(duration=3000)
result.cancel() # Cancel the streaming pipeline after a while to avoid consuming more resources
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
之后我 运行 命令:
# Fill accordingly
PROJECT="MYPROJECT-ID"
BUCKET="MYBUCKET"
TEMPLATE_NAME="TRIAL"
# create the template
python3 -m templates.template-pubsub-bigquery \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$BUCKET/staging \
--temp_location gs://$BUCKET/temp \
--template_location gs://$BUCKET/templates/$TEMPLATE_NAME \
--streaming
创建管道(产生您提到的错误但仍创建模板)。
并且
# Fill job-name and gcs location accordingly
# Uncomment and fill the parameters should you want to use your own
gcloud dataflow jobs run <job-name> \
--gcs-location "gs://<MYBUCKET>/dataflow/templates/mytemplate"
# --parameters input_topic="", output_table=""
到运行管道。
正如我所说,模板已正确创建且管道正常工作。
编辑
的确取消功能在模板中不能正常使用。这似乎是一个问题,它需要在模板创建时使用作业 ID,当然它不存在,因此它省略了该功能。
我发现 处理在管道上提取作业 ID。我尝试了一些调整以使其在模板代码本身内工作,但我认为没有必要。如果您想安排它们的执行时间,我会选择更简单的选项并在特定时间(例如 9:01 GMT)执行流式管道模板,并使用脚本取消管道
import logging, re,os
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def retrieve_job_id():
#Fill as needed
project = '<project-id>'
job_prefix = "<job-name>"
location = '<location>'
logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))
try:
credentials = GoogleCredentials.get_application_default()
dataflow = build('dataflow', 'v1b3', credentials=credentials)
result = dataflow.projects().locations().jobs().list(
projectId=project,
location=location,
).execute()
job_id = "none"
for job in result['jobs']:
if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
job_id = job['id']
break
logging.info("Job ID: {}".format(job_id))
return job_id
except Exception as e:
logging.info("Error retrieving Job ID")
raise KeyError(e)
os.system('gcloud dataflow jobs cancel {}'.format(retrieve_job_id()))
在另一个时间(例如 9:05 GMT)。此脚本假定您每次都运行使用相同的作业名称连接脚本,并采用名称的最新外观运行ce 并将其取消。试了好几次都很好
我正在尝试使用模板安排在一定时间后结束的数据流。使用命令行时我能够成功执行此操作,但是当我尝试使用 Google Cloud Scheduler 执行此操作时,我 运行 在创建模板时遇到错误。
错误是
File "pipelin_stream.py", line 37, in <module>
main()
File "pipelin_stream.py", line 34, in main
result.cancel()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1638, in cancel
raise IOError('Failed to get the Dataflow job id.')
IOError: Failed to get the Dataflow job id.
我用来制作模板的命令是
python pipelin_stream.py \
--runner Dataflowrunner \
--project $PROJECT \
--temp_location $BUCKET/tmp \
--staging_location $BUCKET/staging \
--template_location $BUCKET/templates/time_template_test \
--streaming
我的管道文件是这个
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
PROJECT = 'projectID'
schema = 'ex1:DATE, ex2:STRING'
TOPIC = "projects/topic-name/topics/scraping-test"
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(region='us-central1', service_account_email='email'))
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x:x.decode('utf-8'))
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('tablename'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish(duration=3000)
result.cancel() # If the pipeline has not finished, you can cancel it
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
有谁知道为什么我会收到此错误?
等待时间过后 cancel function 引发错误,似乎无害。
为了证明这一点,我设法从我的 python 3.5 虚拟机中重现了您的确切问题。该模板由 --template_location
在给定路径中创建,可用于 运行 作业。请注意,我需要对您的代码进行一些更改才能使其在 Dataflow 中实际工作。
如果它对你有用,我最终使用了这个管道代码
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import datetime
# Fill this values in order to have them by default
# Note that the table in BQ needs to have the column names message_body and publish_time
Table = 'projectid:datasetid.tableid'
schema = 'ex1:STRING, ex2:TIMESTAMP'
TOPIC = "projects/<projectid>/topics/<topicname>"
class AddTimestamps(beam.DoFn):
def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""Processes each incoming element by extracting the Pub/Sub
message and its publish timestamp into a dictionary. `publish_time`
defaults to the publish timestamp returned by the Pub/Sub server. It
is bound to each element by Beam at runtime.
"""
yield {
"message_body": element.decode("utf-8"),
"publish_time": datetime.datetime.utcfromtimestamp(
float(publish_time)
).strftime("%Y-%m-%d %H:%M:%S.%f"),
}
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic", default=TOPIC)
parser.add_argument("--output_table", default=Table)
args, beam_args = parser.parse_known_args(argv)
# save_main_session needs to be set to true due to modules being used among the code (mostly datetime)
# Uncomment the service account email to specify a custom service account
p = beam.Pipeline(argv=beam_args,options=PipelineOptions(save_main_session=True,
region='us-central1'))#, service_account_email='email'))
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=args.input_topic).with_output_types(bytes)
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(args.output_table, schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
#Warning: Cancel does not work properly in a template
result.wait_until_finish(duration=3000)
result.cancel() # Cancel the streaming pipeline after a while to avoid consuming more resources
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
之后我 运行 命令:
# Fill accordingly
PROJECT="MYPROJECT-ID"
BUCKET="MYBUCKET"
TEMPLATE_NAME="TRIAL"
# create the template
python3 -m templates.template-pubsub-bigquery \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$BUCKET/staging \
--temp_location gs://$BUCKET/temp \
--template_location gs://$BUCKET/templates/$TEMPLATE_NAME \
--streaming
创建管道(产生您提到的错误但仍创建模板)。 并且
# Fill job-name and gcs location accordingly
# Uncomment and fill the parameters should you want to use your own
gcloud dataflow jobs run <job-name> \
--gcs-location "gs://<MYBUCKET>/dataflow/templates/mytemplate"
# --parameters input_topic="", output_table=""
到运行管道。
正如我所说,模板已正确创建且管道正常工作。
编辑
的确取消功能在模板中不能正常使用。这似乎是一个问题,它需要在模板创建时使用作业 ID,当然它不存在,因此它省略了该功能。
我发现
import logging, re,os
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def retrieve_job_id():
#Fill as needed
project = '<project-id>'
job_prefix = "<job-name>"
location = '<location>'
logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))
try:
credentials = GoogleCredentials.get_application_default()
dataflow = build('dataflow', 'v1b3', credentials=credentials)
result = dataflow.projects().locations().jobs().list(
projectId=project,
location=location,
).execute()
job_id = "none"
for job in result['jobs']:
if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
job_id = job['id']
break
logging.info("Job ID: {}".format(job_id))
return job_id
except Exception as e:
logging.info("Error retrieving Job ID")
raise KeyError(e)
os.system('gcloud dataflow jobs cancel {}'.format(retrieve_job_id()))
在另一个时间(例如 9:05 GMT)。此脚本假定您每次都运行使用相同的作业名称连接脚本,并采用名称的最新外观运行ce 并将其取消。试了好几次都很好