气流:Return BashOperartor 作为 odbc 连接的字符串

Airflow: Return BashOperartor as string for odbc connection

我对 Airflow 和 Python 很陌生。我想做的是从 Bash 命令中获取结果并用 return:

组成连接字符串
    import pyodbc as odbc
    import pandas as pd
    import datetime as dt
    from airflow.operators.bash import BashOperator
    from airflow.decorators import dag, task

    @dag(schedule_interval='0 15 10 * *', start_date=dt.datetime(2021, 10, 1), catchup=False)
    def my_dag2():
      wsl_ip = BashOperator(
        task_id="wsl_ip",
        bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print }'",
        do_xcom_push=True
        )

   @task()
   def run():
    
    def busca_informes_cvm(ano,mes):
        url = 'http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_{:4d}{:02d}.csv'.format(ano,mes)
        return pd.read_csv(url, sep=';')

    today = dt.date.today()
    ano = (today.year)
    mes= (today.month)-1
    file_name_Comp = '{:4d}-{:02d}'.format(ano,mes)
    file_name = '{:4d}{:02d}.csv'.format(ano,mes)
    path_name = r'C:\Airflow\{:4d}{:02d}.csv'.format(ano,mes)

    conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '+ wsl_ip +';Database=CVM;uid=Airflow;pwd=ubuntu')
    df = pd.read_sql_query('select max(DT_COMPTC) from Historico;', conn)
    left = df[''].str[:7]

    if file_name_Comp <= left[0]:
        print('Sair')
    else:
        informes_diarios = busca_informes_cvm(ano,mes)
        informes_diarios.to_csv(file_name, sep=';', index=False)
        db_view_nm = '[dbo].[Bulk]'
        qry = "BULK INSERT " + db_view_nm + " FROM '" + path_name + "' WITH (FIELDTERMINATOR = ';', ROWTERMINATOR = '0x0a', FIRSTROW = 2,ROWS_PER_BATCH = 100000 )"
        cursor = conn.cursor()
        success = cursor.execute(qry)
        conn.commit()
        cursor.close
        print('Concluído')

   execute = run()

etl_dag = my_dag2()

我需要找到一种方法将 wsl_ip 转换为字符串。任何帮助将不胜感激。

要获取“wsl_ip”任务的输出,您可以使用 Airflow 中为每个运算符公开的 .output 属性。此 属性 是对称为 XComArg 的经典 xcom_pull() 方法的抽象(请参阅文档 here)。

您可以尝试这样的操作:

wsl_ip = BashOperator(
        task_id="wsl_ip",
        bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print }'",
        do_xcom_push=True
    )

@task()
    def run(ip):

        ...

        conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '+ ip +';Database=CVM;uid=Airflow;pwd=ubuntu')

        ...

    execute = run(ip=wsl_ip.output)

run TaskFlow 函数现在将输入作为从您的 BashOperator 任务推送的 XCom,该输入应转换为字符串。以这种方式使用 .output 也会自动在“wsl_ip”和“运行”任务之间创建任务依赖关系。