传递字符串列表作为 Airflow 中依赖任务的参数

Pass a list of strings as parameter of a dependant task in Airflow

我正在尝试通过 XCom 将字符串列表从一个任务传递到另一个任务,但我似乎无法将推送的列表解释回列表。

例如,当我在某个函数 blah 中执行此操作时,即 ShortCircuitOperator 中的 运行:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

然后我想使用这样的列表作为运算符的参数。例如,

run_task_after_blah = AfterBlahOperator(
    task_id='run-task-after-blah',
    ...,
    input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
    ...,
)

我希望 input_paths 等于 paths 但事实并非如此,因为渲染首先发生然后赋值,并且模板渲染在某种程度上转换了 xcom_pull return 到 stringified 列表(然后我的 AfterBlahOperator 插入将其分配为 JSON.

中元素的值

我尝试将 paths 连接成一个由一些分隔符分隔的字符串,然后将其推送到 XCom,然后在从 XCom 拉出时将其拆分回来,但是当 XCom 首先呈现时,我明白了,要么stringified 列表,当在模板内部调用 split 函数时,或者 paths 的原始连接字符串,如果 split 函数应用于参数 (如 "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';').

XCom 似乎非常适用于将单个值作为任务参数或多个值,当可以进一步处理提取的值时,但不适用于 multiple_values 转换为 'one' 作为任务参数。

有没有一种方法可以做到这一点而不必编写一个额外的函数来准确 return 这样的字符串列表? 或者也许我过度滥用 XCom,但 Airflow 中有许多运算符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是先前任务的结果,因此事先不知道)。

Jinja 呈现字符串,因此如果您通过模板获取 XCom,它始终是一个字符串。相反,您将需要获取您有权访问 TaskInstance 对象的 XCom。像这样:

class AfterBlahOperator(BaseOperator):

    def __init__(self, ..., input_task_id, *args, **kwargs):
        ...
        self.input_task_id = input_task_id
        super(AfterBlahOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
        for path in input_paths:
            ...

这类似于您在 PythonOperator 中获取它的方式,XCom docs 提供了一个示例。

请注意,当它可以硬编码到 DAG 中时,您仍然可以支持单独的 input_paths 参数,您只需要额外检查一下以查看从哪个参数读取值。

AfterBlahOperatorexecute 方法中调用 eval(input_paths)。这样,字符串化列表可以转换回列表

class AfterBlahOperator(BaseOperator):
template_fields = (input_paths)

def __init__(self, ..., *args, **kwargs):
    ...


def execute(self, context):
    paths = eval(input_paths) 
    for path in paths:
        ...

基于,我制作了一个快速工厂,允许使用最少的样板代码将 Jinja 模板字符串“解压”到任意运算符的列表中(因此您不需要创建子class 每一个运算符):

def operator_unpackjinja_factory(baseclass):
    class ChildUnpackJinja(baseclass):
        def __init__(self, xcompull_taskid, *args, xcompull_key="return_value", xcompull_attrname="objects", **kwargs):
            self.xcompull_taskid = xcompull_taskid
            self.xcompull_key = xcompull_key
            self.xcompull_attrname = xcompull_attrname
            self.baseclass = baseclass
            assert xcompull_attrname not in kwargs
            kwargs[xcompull_attrname] = "EMPTY"
            super(ChildUnpackJinja, self).__init__(*args, **kwargs)

        def execute(self, context):
            objects = context['ti'].xcom_pull(task_ids=self.xcompull_taskid, key=self.xcompull_key)
            setattr(self, self.xcompull_attrname, objects)
            return super(ChildUnpackJinja, self).execute(context)

    return ChildUnpackJinja

然后可以将此工厂的调用用作您最初想使用的任何运算符的直接替换,如下所示:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

with Dag(...) as dag:
    gcpbucket_to_bigquery_stagingtable = operator_unpackjinja_factory(GCSToBigQueryOperator)(
        xcompull_taskid = "my_upstream_task_id",
        xcompull_attrname = "source_objects",
        task_id="gcpbucket_to_bigquery_stagingtable",
        dag=dag,
        bucket="my_bucket_name"
    )

正如所讨论的 in this question,我建议还创建 Memoize class 并将 @Memoize-装饰器添加到工厂:

class Memoize:
    def __init__(self, f):
        self.f = f
        self.memo = {}
    def __call__(self, *args):
        return self.memo.setdefault(args, self.f(*args))

@Memoize
def operator_unpackjinja_factory(baseclass):
   <content unchanged, see above>