运行 通过 Airflow 进行数据流作业时出错:模块 'apache_beam.io' 没有属性 'ReadFromBigQuery
Error while running dataflow job via Airflow: module 'apache_beam.io' has no attribute 'ReadFromBigQuery
我在尝试执行由 Airflow 编排的 DataFlow 作业时遇到了一些问题。触发 DAG 后,我收到此错误:
模块'apache_beam.io'没有属性'ReadFromBigQuery''
[2021-05-05 19:35:31,279] {base_task_runner.py:115} INFO - Job 559673: Subtask exec_df_job [2021-05-05 19:35:31,278] {gcp_dataflow_hook.py:121} INFO - Running command: python /tmp/dataflow50825def-pipeline.py --runner=DataflowRunner --project=project_name --temp_location=gs://tmp/ --runner=DataflowRunner --save_main_session=True --requirements_file=https://storage.cloud.google.com/df-jobs/requirements.txt --beam_plugins=[gcp] --labels=airflow-version=v1-10-6-composer --job_name=job-name-a6fe02fb --region=us-central1
[2021-05-05 19:35:31,516] {base_task_runner.py:115} INFO - Job 559673: Subtask exec_df_job [2021-05-05 19:35:31,516] {gcp_dataflow_hook.py:152} INFO - Start waiting for DataFlow process to complete.
[2021-05-05 19:35:42,605] {base_task_runner.py:115} INFO - Job 559673: Subtask exec_df_job [2021-05-05 19:35:42,603] {gcp_dataflow_hook.py:133} WARNING - b'/opt/python3.6/lib/python3.6/site-packages/apache_beam/__init__.py:84: UserWarning: Some syntactic constructs of Python 3 are not yet fully supported by Apache Beam.\n \'Some syntactic constructs of Python 3 are not yet fully supported by \'\nTraceback (most recent call last):\n File "/tmp/dataflow50825def-pipeline.py", line 94, in <module>\n run()\n File "/tmp/dataflow50825def-pipeline.py", line 83, in run\n | "Writes to BQ" >> beam.io.WriteToBigQuery(\nAttributeError: module \'apache_beam.io\' has no attribute \'ReadFromBigQuery\''
[2021-05-05 19:35:42,613] {taskinstance.py:1059} ERROR - DataFlow failed with return code 1
在我的 DAG 定义文件中,我有这样的东西:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
default_args = {
'depends_on_past': False,
'start_date': datetime(2021, 4, 27),
'catchup': False,
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(seconds=5),
}
with DAG(
dag_id='test_dataflow_jobs',
default_args=default_args,
schedule_interval=None
) as dag:
dataflow_default_options = {
'project': 'project-name',
'temp_location': 'gs://tmp/',
'runner': 'DataflowRunner',
'save_main_session': 'True',
'extra_packages': 'apache-beam[gcp]==2.28.0'
}
options = {
'requirements_file': 'https://storage.cloud.google.com/df-jobs/requirements.txt',
'beam_plugins': "['gcp']"
}
exec_df_job = DataFlowPythonOperator(
task_id='exec_df_job',
gcp_conn_id='google-cloud-default',
py_file='gs://df-jobs/pipeline.py',
job_name='job-name',
dataflow_default_options=dataflow_default_options,
options=options
)
最后,我的 Apache Beam 管道:
import logging
from datetime import datetime
from argparse import ArgumentParser
import apache_beam as beam
from apache_beam.io import ReadFromBigQuery, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
sql_query = """SELECT * FROM my_table"""
table_schema = {'fields': [
{'name': 'a', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'b', 'type': 'BOOLEAN', 'mode': 'NULLABLE'},
{'name': 'c', 'type': 'DATETIME', 'mode': 'NULLABLE'}]}
def parse_data(record):
pass
def run(argv=None):
parser = ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipe = beam.Pipeline(options=pipeline_options)
input_data = (
pipe
| "Reads data from bigquery" >> ReadFromBigQuery(query=sql_query, use_standard_sql=True)
| "Map cnpj output" >> beam.Map(parse_data)
| "Writes to BQ" >> WriteToBigQuery(
table="table_name",
dataset="dataset_name",
project="project_name",
schema=table_schema)
)
pipe.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
奇怪的是,运行通过命令行执行相同的作业,管道成功执行,没有错误。
正在通过 cli 执行:
python3 pipeline.py --temp_location gs://tmp/ --project project_name --runner DataflowRunner --region us-central1 --name job_name
请帮帮我:(
这道题的主要问题是:著名的:在我的机器上可以,就是不同的框架版本。
在我的 Cloud Composer 环境(Apache Airflow)上安装 apache-beam[gcp]
后,我注意到 Apache Beam SDK 的版本是 2.15.0 并且没有 ReadFromBigQuery
和 WriteToBigQuery
实施。
我们正在使用这个版本,因为它与我们的 Composer 版本兼容。更改我的代码后,一切正常
import logging
from datetime import datetime
from argparse import ArgumentParser
import apache_beam as beam
from apache_beam.io import ReadFromBigQuery, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
sql_query = """SELECT * FROM my_table"""
# Needed to change how to declare the schema
table_schema = ('a:INTEGER, b:BOOLEAN, c:DATETIME')
def parse_data(record):
pass
def run(argv=None):
parser = ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipe = beam.Pipeline(options=pipeline_options)
input_data = (
pipe
| "Reads data from bigquery" >> beam.io.Read(beam.io.BigQuerySource(query=sql_query, use_standard_sql=True))
| "Map cnpj output" >> beam.Map(parse_data)
| "Writes to BQ" >> beam.io.Write(beam.io.BigQuerySink(
'project_name:dataset_name.table_name',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
pipe.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
为什么我的旧代码有效?
同样,因为 Apache Beam 的版本不同。由于我使用的是 ubuntu 20.04 和最新版本的 pip,当我 运行 pip3 install 'apache-beam[gcp]'
时,包管理器安装了 2.29.0 SDK 版本。
现在,我创建了一个与我的 Composer 环境版本相同的开发环境,并计划更新我的 Composer 和 Airflow 版本,并因此更新我的 PyPi 包。
我在尝试执行由 Airflow 编排的 DataFlow 作业时遇到了一些问题。触发 DAG 后,我收到此错误:
模块'apache_beam.io'没有属性'ReadFromBigQuery''
[2021-05-05 19:35:31,279] {base_task_runner.py:115} INFO - Job 559673: Subtask exec_df_job [2021-05-05 19:35:31,278] {gcp_dataflow_hook.py:121} INFO - Running command: python /tmp/dataflow50825def-pipeline.py --runner=DataflowRunner --project=project_name --temp_location=gs://tmp/ --runner=DataflowRunner --save_main_session=True --requirements_file=https://storage.cloud.google.com/df-jobs/requirements.txt --beam_plugins=[gcp] --labels=airflow-version=v1-10-6-composer --job_name=job-name-a6fe02fb --region=us-central1
[2021-05-05 19:35:31,516] {base_task_runner.py:115} INFO - Job 559673: Subtask exec_df_job [2021-05-05 19:35:31,516] {gcp_dataflow_hook.py:152} INFO - Start waiting for DataFlow process to complete.
[2021-05-05 19:35:42,605] {base_task_runner.py:115} INFO - Job 559673: Subtask exec_df_job [2021-05-05 19:35:42,603] {gcp_dataflow_hook.py:133} WARNING - b'/opt/python3.6/lib/python3.6/site-packages/apache_beam/__init__.py:84: UserWarning: Some syntactic constructs of Python 3 are not yet fully supported by Apache Beam.\n \'Some syntactic constructs of Python 3 are not yet fully supported by \'\nTraceback (most recent call last):\n File "/tmp/dataflow50825def-pipeline.py", line 94, in <module>\n run()\n File "/tmp/dataflow50825def-pipeline.py", line 83, in run\n | "Writes to BQ" >> beam.io.WriteToBigQuery(\nAttributeError: module \'apache_beam.io\' has no attribute \'ReadFromBigQuery\''
[2021-05-05 19:35:42,613] {taskinstance.py:1059} ERROR - DataFlow failed with return code 1
在我的 DAG 定义文件中,我有这样的东西:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
default_args = {
'depends_on_past': False,
'start_date': datetime(2021, 4, 27),
'catchup': False,
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(seconds=5),
}
with DAG(
dag_id='test_dataflow_jobs',
default_args=default_args,
schedule_interval=None
) as dag:
dataflow_default_options = {
'project': 'project-name',
'temp_location': 'gs://tmp/',
'runner': 'DataflowRunner',
'save_main_session': 'True',
'extra_packages': 'apache-beam[gcp]==2.28.0'
}
options = {
'requirements_file': 'https://storage.cloud.google.com/df-jobs/requirements.txt',
'beam_plugins': "['gcp']"
}
exec_df_job = DataFlowPythonOperator(
task_id='exec_df_job',
gcp_conn_id='google-cloud-default',
py_file='gs://df-jobs/pipeline.py',
job_name='job-name',
dataflow_default_options=dataflow_default_options,
options=options
)
最后,我的 Apache Beam 管道:
import logging
from datetime import datetime
from argparse import ArgumentParser
import apache_beam as beam
from apache_beam.io import ReadFromBigQuery, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
sql_query = """SELECT * FROM my_table"""
table_schema = {'fields': [
{'name': 'a', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'b', 'type': 'BOOLEAN', 'mode': 'NULLABLE'},
{'name': 'c', 'type': 'DATETIME', 'mode': 'NULLABLE'}]}
def parse_data(record):
pass
def run(argv=None):
parser = ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipe = beam.Pipeline(options=pipeline_options)
input_data = (
pipe
| "Reads data from bigquery" >> ReadFromBigQuery(query=sql_query, use_standard_sql=True)
| "Map cnpj output" >> beam.Map(parse_data)
| "Writes to BQ" >> WriteToBigQuery(
table="table_name",
dataset="dataset_name",
project="project_name",
schema=table_schema)
)
pipe.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
奇怪的是,运行通过命令行执行相同的作业,管道成功执行,没有错误。
正在通过 cli 执行:
python3 pipeline.py --temp_location gs://tmp/ --project project_name --runner DataflowRunner --region us-central1 --name job_name
请帮帮我:(
这道题的主要问题是:著名的:在我的机器上可以,就是不同的框架版本。
在我的 Cloud Composer 环境(Apache Airflow)上安装 apache-beam[gcp]
后,我注意到 Apache Beam SDK 的版本是 2.15.0 并且没有 ReadFromBigQuery
和 WriteToBigQuery
实施。
我们正在使用这个版本,因为它与我们的 Composer 版本兼容。更改我的代码后,一切正常
import logging
from datetime import datetime
from argparse import ArgumentParser
import apache_beam as beam
from apache_beam.io import ReadFromBigQuery, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
sql_query = """SELECT * FROM my_table"""
# Needed to change how to declare the schema
table_schema = ('a:INTEGER, b:BOOLEAN, c:DATETIME')
def parse_data(record):
pass
def run(argv=None):
parser = ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipe = beam.Pipeline(options=pipeline_options)
input_data = (
pipe
| "Reads data from bigquery" >> beam.io.Read(beam.io.BigQuerySource(query=sql_query, use_standard_sql=True))
| "Map cnpj output" >> beam.Map(parse_data)
| "Writes to BQ" >> beam.io.Write(beam.io.BigQuerySink(
'project_name:dataset_name.table_name',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
pipe.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
为什么我的旧代码有效?
同样,因为 Apache Beam 的版本不同。由于我使用的是 ubuntu 20.04 和最新版本的 pip,当我 运行 pip3 install 'apache-beam[gcp]'
时,包管理器安装了 2.29.0 SDK 版本。
现在,我创建了一个与我的 Composer 环境版本相同的开发环境,并计划更新我的 Composer 和 Airflow 版本,并因此更新我的 PyPi 包。