从 App Engine 执行数据流作业
Execute dataflow job from App Engine
我对 GCP 技术比较陌生。目前,我正在做 POC 以创建一个计划的数据流作业,该作业将数据从 google 云存储提取(插入)到 BigQuery。在阅读了一些教程和文档之后,我得出以下结论:
我首先创建一个读取 avro 文件并将其加载到 BigQuery 的数据流作业。此数据流已经过测试并且运行良好。
(self.pipeline
| output_table + ': read table ' >> ReadFromAvro(storage_input_path)
| output_table + ': filter columns' >> beam.Map(self.__filter_columns, columns=columns)
| output_table + ': write to BigQuery' >> beam.Write(
beam.io.BigQuerySink(output_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
为了创建预定作业,我随后创建了一个简单的 Web 服务,如下所示:
import logging
from flask import Flask
from common.tableLoader import TableLoader
from ingestion import IngestionToBigQuery
from common.configReader import ConfigReader
app = Flask(__name__)
@app.route('/')
def hello():
"""Return a friendly HTTP greeting."""
logging.getLogger().setLevel(logging.INFO)
config = ConfigReader('columbus-config') # TODO read from args
tables = TableLoader('experience')
ingestor = IngestionToBigQuery(config.configuration, tables.list_of_tables)
ingestor.ingest_table()
return 'Hello World!'```
我还创建了 app.yaml:
runtime: python
env: flex
entrypoint: gunicorn -b :$PORT recsys_data_pipeline.main:app
threadsafe: yes
runtime_config:
python_version: 2
resources:
memory_gb: 2.0
然后,我使用此命令部署它 gcloud app deploy
,但是,我收到以下错误:
default[20170417t173837] ERROR:root:The gcloud tool was not found.
default[20170417t173837] Traceback (most recent call last):
File "/env/local/lib/python2.7/site-packages/apache_beam/internal/gcp/auth.py", line 109, in _refresh ['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
File "/env/local/lib/python2.7/site-packages/apache_beam/utils/processes.py", line 52, in Popen return subprocess.Popen(*args, **kwargs)
File "/usr/lib/python2.7/subprocess.py", line 710, in __init__ errread, errwrite) File "/usr/lib/python2.7/subprocess.py", line 1335, in _execute_child raise child_exception OSError: [Errno 2] No such file or directory
从上面的消息中,我发现错误来自 apache_beam auth.py class
,具体来说,它来自以下函数:
def _refresh(self, http_request):
"""Gets an access token using the gcloud client."""
try:
gcloud_process = processes.Popen(['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
except OSError as exn:
logging.error('The gcloud tool was not found.', exc_info=True)
raise AuthenticationException('The gcloud tool was not found: %s' % exn)
output, _ = gcloud_process.communicate()
self.access_token = output.strip()
在未提供凭据(service_acount_name
和 service_acount_key
时调用:
if google_cloud_options.service_account_name:
if not google_cloud_options.service_account_key_file:
raise AuthenticationException(
'key file not provided for service account.')
if not os.path.exists(google_cloud_options.service_account_key_file):
raise AuthenticationException(
'Specified service account key file does not exist.')
else:
try:
credentials = _GCloudWrapperCredentials(user_agent)
# Check if we are able to get an access token. If not fallback to
# application default credentials.
credentials.get_access_token()
return credentials
所以我有两个问题:
- 有没有办法在我的代码或配置文件中的某处(例如:在
app.yaml
中 "attach" 凭据(service_acount_name
和 service_acount_key
) )?
- 从 google 应用引擎触发数据流作业的最佳做法是什么?
非常感谢,任何建议和意见都将非常有帮助!
请在 https://github.com/amygdala/gae-dataflow 查看官方示例。
我对 GCP 技术比较陌生。目前,我正在做 POC 以创建一个计划的数据流作业,该作业将数据从 google 云存储提取(插入)到 BigQuery。在阅读了一些教程和文档之后,我得出以下结论:
我首先创建一个读取 avro 文件并将其加载到 BigQuery 的数据流作业。此数据流已经过测试并且运行良好。
(self.pipeline | output_table + ': read table ' >> ReadFromAvro(storage_input_path) | output_table + ': filter columns' >> beam.Map(self.__filter_columns, columns=columns) | output_table + ': write to BigQuery' >> beam.Write( beam.io.BigQuerySink(output_table, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
为了创建预定作业,我随后创建了一个简单的 Web 服务,如下所示:
import logging from flask import Flask from common.tableLoader import TableLoader from ingestion import IngestionToBigQuery from common.configReader import ConfigReader app = Flask(__name__) @app.route('/') def hello(): """Return a friendly HTTP greeting.""" logging.getLogger().setLevel(logging.INFO) config = ConfigReader('columbus-config') # TODO read from args tables = TableLoader('experience') ingestor = IngestionToBigQuery(config.configuration, tables.list_of_tables) ingestor.ingest_table() return 'Hello World!'```
我还创建了 app.yaml:
runtime: python env: flex entrypoint: gunicorn -b :$PORT recsys_data_pipeline.main:app threadsafe: yes runtime_config: python_version: 2 resources: memory_gb: 2.0
然后,我使用此命令部署它 gcloud app deploy
,但是,我收到以下错误:
default[20170417t173837] ERROR:root:The gcloud tool was not found.
default[20170417t173837] Traceback (most recent call last):
File "/env/local/lib/python2.7/site-packages/apache_beam/internal/gcp/auth.py", line 109, in _refresh ['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
File "/env/local/lib/python2.7/site-packages/apache_beam/utils/processes.py", line 52, in Popen return subprocess.Popen(*args, **kwargs)
File "/usr/lib/python2.7/subprocess.py", line 710, in __init__ errread, errwrite) File "/usr/lib/python2.7/subprocess.py", line 1335, in _execute_child raise child_exception OSError: [Errno 2] No such file or directory
从上面的消息中,我发现错误来自 apache_beam auth.py class
,具体来说,它来自以下函数:
def _refresh(self, http_request):
"""Gets an access token using the gcloud client."""
try:
gcloud_process = processes.Popen(['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
except OSError as exn:
logging.error('The gcloud tool was not found.', exc_info=True)
raise AuthenticationException('The gcloud tool was not found: %s' % exn)
output, _ = gcloud_process.communicate()
self.access_token = output.strip()
在未提供凭据(service_acount_name
和 service_acount_key
时调用:
if google_cloud_options.service_account_name:
if not google_cloud_options.service_account_key_file:
raise AuthenticationException(
'key file not provided for service account.')
if not os.path.exists(google_cloud_options.service_account_key_file):
raise AuthenticationException(
'Specified service account key file does not exist.')
else:
try:
credentials = _GCloudWrapperCredentials(user_agent)
# Check if we are able to get an access token. If not fallback to
# application default credentials.
credentials.get_access_token()
return credentials
所以我有两个问题:
- 有没有办法在我的代码或配置文件中的某处(例如:在
app.yaml
中 "attach" 凭据(service_acount_name
和service_acount_key
) )? - 从 google 应用引擎触发数据流作业的最佳做法是什么?
非常感谢,任何建议和意见都将非常有帮助!
请在 https://github.com/amygdala/gae-dataflow 查看官方示例。