将额外参数传递给 DataFlowOperator 上下文

Pass extra arguments to DataFlowOperator context

当我触发我的 DAG 时,我在 dag_run.conf['metadata']

中传递了一堆额外的参数

所以我的触发事件是这样的:

{'bucket': 'blah-blah', 
      'contentType': 'text/json', 
      'crc32c': '375Jog==', 'customTime': '1970-01-01T00:00:00.000Z',
      'etag': 'CJCqi+DTrO0CEAk=', 'eventBasedHold': False, 'generation': '1606821286696208',
      'id': 'xxx',
      'kind': 'storage#object',
      'md5Hash': 'xxxx',
      'mediaLink': 'xxxx',
      'metadata': {'url': 'xxx',
                   'date_extracted': '20201115',
                   'file_type': 'xxxx',
                   'filename': 'xxxx.json',
                   'row_count': '30', 'time_extracted': '063013'},
}

我有一个在 on_failure_callback 上运行的 python 函数,但这里的上下文与 dag_run 上下文完全不同。

失败时传递给函数的上下文是:

{'conf': <airflow.configuration.AutoReloadableProxy object at 0x7fa275de3c18>,
 'dag': <DAG: my_dag>, 'ds': '2020-12-09', 
'next_ds': '2020-12-09', 
'next_ds_nodash': '20201209',
....}

有没有办法将 dag_run.conf['metadata'] 作为新上下文的一部分传递?

我试过使用 partial"{{ dag_run.conf['metadata'] }}" 被解释为字符串。

我的数据流运算符如下所示:

DataflowTemplateOperator(
        task_id="df_task1",
        job_name="df-{{ dag_run.conf['trace_id'] }}-{{dag_run.conf['file_type']}}",
        template="gs://dataflow/my-df-job",
        on_failure_callback= partial(task_fail_slack_alert,"{{ dag_run.conf['metadata'] }}"),
        parameters={
            "filePath":"{{ dag_run.conf['file_name'] }}",
            "traceId":"{{ dag_run.conf['trace_id'] }}"
        },
    )

我的可调用函数暂时打印出来:

def task_fail_slack_alert(dag_run,context):
    print("payload {}".format(context))
    print("dag_run {}".format(dag_run))

日志:

INFO - payload {'conf': <airflow.configuration.AutoReloadableProxy object at 0x7fa275de3c18>...}
INFO - dag_run {{ dag_run.conf['metadata'] }}

你不能那样使用 {{ dag_run.conf['metadata'] }}。 您可以从函数的 context 访问它:

def task_fail_slack_alert(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(task_instances)
    print(dag_run.conf.get('metadata'))
    print(context.get('exception'))

DataflowTemplateOperator(
        task_id="df_task1",
        job_name="df-{{ dag_run.conf['trace_id'] }}-{{dag_run.conf['file_type']}}",
        template="gs://dataflow/my-df-job",
        on_failure_callback=task_fail_slack_alert,
        parameters={
            "filePath":"{{ dag_run.conf['file_name'] }}",
            "traceId":"{{ dag_run.conf['trace_id'] }}"
        },
    )