Google Cloud Composer(Airflow) - DAG 内的数据流作业成功执行,但 DAG 失败
Google Cloud Composer(Airflow) - dataflow job inside a DAG executes successfully, but the DAG fails
我的 DAG 看起来像这样
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'dataflow_default_options': {
'project': 'test',
'tempLocation': 'gs://test/dataflow/pipelines/temp/',
'stagingLocation': 'gs://test/dataflow/pipelines/staging/',
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '1',
'region': 'asia-east1'
}
}
dag = DAG(
dag_id='gcs_avro_to_bq_dag',
default_args=default_args,
description='ETL for loading data from GCS(present in the avro format) to BQ',
schedule_interval=None,
dagrun_timeout=datetime.timedelta(minutes=30))
task = DataFlowJavaOperator(
task_id='gcs_avro_to_bq_flow_job',
jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar',
poll_sleep=1,
options={
'input': '{{ ts }}',
},
dag=dag)
我的 DAG 正在执行一个 jar 文件。 jar 文件包含 运行 数据流作业的代码,该作业将数据从 GCS 写入 BQ。 jar 本身成功执行。
当我尝试执行 airflow 作业时,我看到以下错误
[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] {gcp_api_base_hook.py:97} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-05-20 17:20:41,937] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,853] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest
[2020-05-20 17:20:44,338] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:44,338] {discovery.py:873} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json
[2020-05-20 17:20:45,285] {__init__.py:1631} ERROR - <HttpError 404 when requesting https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json returned "(7e83a8221abb0a9b): Information about job asia-east1 could not be found in our system. Please double check the id is correct. If it is please contact customer support.">
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 184, in execut
self.jar, self.job_class
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 220, in start_java_dataflo
self._start_dataflow(variables, name, command_prefix, label_formatter
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 286, in wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 200, in _start_dataflo
self.poll_sleep, job_id).wait_for_done(
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 44, in __init_
self._job = self._get_job(
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 63, in _get_jo
jobId=self._job_id).execute(num_retries=5
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
return wrapped(*args, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut
raise HttpError(resp, content, uri=self.uri
我做了更多的挖掘,我可以看到以下 API 被气流调用 https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1
你可以看到jobs之后的最后一个参数是asia-east
,所以我觉得airflow作业正在尝试使用我在default_args中提供的区域来搜索状态数据流作业。不确定这是否是正在发生的事情,但只是想说明这一观察结果。我的流程 DAG 中是否缺少某些内容?我的 java 工作逻辑也是这样的
public class GcsAvroToBQ {
public interface Options extends PipelineOptions {
@Description("Input")
ValueProvider<String> getInput();
void setInput(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
GcsAvroToBQ.Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(GcsAvroToBQ.Options.class);
options.getJobName();
run(options);
}
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// My Pipeline logic to read Avro and upload to BQ
PCollection<TableRow> tableRowsForBQ; // Data to store in BQ
tableRowsForBQ.apply(
BigQueryIO.writeTableRows()
.to(bqDatasetName)
.withSchema(fieldSchemaListBuilder.schema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
return pipeline.run();
}
}
这是 sdk 版本 2.20.0 中已确认的错误
https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/hooks/dataflow.py#L47
请使用2.19.0版本,应该可以正常使用。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.19.0</version>
<scope>runtime</scope>
</dependency>
由于 fix 尚未发布,尽管它已合并到 master 中,我将为需要使用比 2.19.0 更新的 Beam SDK 版本的任何人添加以下解决方法。
想法是在自定义挂钩中实施修复(与 dataflow_hook.py 相同,但应用了建议的 change),然后实施使用此挂钩的自定义运算符。
这是我的做法:
首先,我创建了一个名为 my_dataflow_hook.py
:
的文件
import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _myDataflow(_Dataflow):
@staticmethod
def _extract_job(line):
job_id_pattern = re.compile(
br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()
class MyDataFlowHook(DataFlowHook):
@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _myDataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
然后,我创建了一个名为 my_dataflow_java_operator.py
:
的文件
import copy
from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin
class MyDataFlowJavaOperator(DataFlowJavaOperator):
def execute(self, context):
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.jar = bucket_helper.google_cloud_to_local(self.jar)
hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = copy.copy(self.dataflow_default_options)
dataflow_options.update(self.options)
hook.start_java_dataflow(self.job_name, dataflow_options,
self.jar, self.job_class)
class MyDataFlowPlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'dataflow_fix_plugin'
operators = [MyDataFlowJavaOperator]
最后,我按照以下结构将这些文件上传到 Composer 环境的存储桶中:
├── dags
│ └── my_dag.py
└── plugins
├── hooks
│ └── my_dataflow_hook.py
└── my_dataflow_java_operator.py
现在,我可以在我的 DAG 中使用 MyDataFlowJavaOperator
创建任务:
from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowJavaOperator
...
with DAG("df-custom-test", default_args=default_args) as dag:
test_task = MyDataFlowJavaOperator(dag=dag, task_id="df-java", py_file=PY_FILE, job_name=JOB_NAME)
当然,如果需要,您可以对 DataFlowPythonOperator
或 DataflowTemplateOperator
执行相同的操作。
我的 DAG 看起来像这样
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'dataflow_default_options': {
'project': 'test',
'tempLocation': 'gs://test/dataflow/pipelines/temp/',
'stagingLocation': 'gs://test/dataflow/pipelines/staging/',
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '1',
'region': 'asia-east1'
}
}
dag = DAG(
dag_id='gcs_avro_to_bq_dag',
default_args=default_args,
description='ETL for loading data from GCS(present in the avro format) to BQ',
schedule_interval=None,
dagrun_timeout=datetime.timedelta(minutes=30))
task = DataFlowJavaOperator(
task_id='gcs_avro_to_bq_flow_job',
jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar',
poll_sleep=1,
options={
'input': '{{ ts }}',
},
dag=dag)
我的 DAG 正在执行一个 jar 文件。 jar 文件包含 运行 数据流作业的代码,该作业将数据从 GCS 写入 BQ。 jar 本身成功执行。
当我尝试执行 airflow 作业时,我看到以下错误
[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] {gcp_api_base_hook.py:97} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-05-20 17:20:41,937] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,853] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest
[2020-05-20 17:20:44,338] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:44,338] {discovery.py:873} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json
[2020-05-20 17:20:45,285] {__init__.py:1631} ERROR - <HttpError 404 when requesting https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json returned "(7e83a8221abb0a9b): Information about job asia-east1 could not be found in our system. Please double check the id is correct. If it is please contact customer support.">
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 184, in execut
self.jar, self.job_class
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 220, in start_java_dataflo
self._start_dataflow(variables, name, command_prefix, label_formatter
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 286, in wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 200, in _start_dataflo
self.poll_sleep, job_id).wait_for_done(
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 44, in __init_
self._job = self._get_job(
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 63, in _get_jo
jobId=self._job_id).execute(num_retries=5
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
return wrapped(*args, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut
raise HttpError(resp, content, uri=self.uri
我做了更多的挖掘,我可以看到以下 API 被气流调用 https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1
你可以看到jobs之后的最后一个参数是asia-east
,所以我觉得airflow作业正在尝试使用我在default_args中提供的区域来搜索状态数据流作业。不确定这是否是正在发生的事情,但只是想说明这一观察结果。我的流程 DAG 中是否缺少某些内容?我的 java 工作逻辑也是这样的
public class GcsAvroToBQ {
public interface Options extends PipelineOptions {
@Description("Input")
ValueProvider<String> getInput();
void setInput(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
GcsAvroToBQ.Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(GcsAvroToBQ.Options.class);
options.getJobName();
run(options);
}
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// My Pipeline logic to read Avro and upload to BQ
PCollection<TableRow> tableRowsForBQ; // Data to store in BQ
tableRowsForBQ.apply(
BigQueryIO.writeTableRows()
.to(bqDatasetName)
.withSchema(fieldSchemaListBuilder.schema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
return pipeline.run();
}
}
这是 sdk 版本 2.20.0 中已确认的错误
https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/hooks/dataflow.py#L47
请使用2.19.0版本,应该可以正常使用。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.19.0</version>
<scope>runtime</scope>
</dependency>
由于 fix 尚未发布,尽管它已合并到 master 中,我将为需要使用比 2.19.0 更新的 Beam SDK 版本的任何人添加以下解决方法。
想法是在自定义挂钩中实施修复(与 dataflow_hook.py 相同,但应用了建议的 change),然后实施使用此挂钩的自定义运算符。 这是我的做法:
首先,我创建了一个名为 my_dataflow_hook.py
:
import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _myDataflow(_Dataflow):
@staticmethod
def _extract_job(line):
job_id_pattern = re.compile(
br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()
class MyDataFlowHook(DataFlowHook):
@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _myDataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
然后,我创建了一个名为 my_dataflow_java_operator.py
:
import copy
from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin
class MyDataFlowJavaOperator(DataFlowJavaOperator):
def execute(self, context):
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.jar = bucket_helper.google_cloud_to_local(self.jar)
hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = copy.copy(self.dataflow_default_options)
dataflow_options.update(self.options)
hook.start_java_dataflow(self.job_name, dataflow_options,
self.jar, self.job_class)
class MyDataFlowPlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'dataflow_fix_plugin'
operators = [MyDataFlowJavaOperator]
最后,我按照以下结构将这些文件上传到 Composer 环境的存储桶中:
├── dags
│ └── my_dag.py
└── plugins
├── hooks
│ └── my_dataflow_hook.py
└── my_dataflow_java_operator.py
现在,我可以在我的 DAG 中使用 MyDataFlowJavaOperator
创建任务:
from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowJavaOperator
...
with DAG("df-custom-test", default_args=default_args) as dag:
test_task = MyDataFlowJavaOperator(dag=dag, task_id="df-java", py_file=PY_FILE, job_name=JOB_NAME)
当然,如果需要,您可以对 DataFlowPythonOperator
或 DataflowTemplateOperator
执行相同的操作。