气流:Return BashOperartor 作为 odbc 连接的字符串
Airflow: Return BashOperartor as string for odbc connection
我对 Airflow 和 Python 很陌生。我想做的是从 Bash 命令中获取结果并用 return:
组成连接字符串
import pyodbc as odbc
import pandas as pd
import datetime as dt
from airflow.operators.bash import BashOperator
from airflow.decorators import dag, task
@dag(schedule_interval='0 15 10 * *', start_date=dt.datetime(2021, 10, 1), catchup=False)
def my_dag2():
wsl_ip = BashOperator(
task_id="wsl_ip",
bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print }'",
do_xcom_push=True
)
@task()
def run():
def busca_informes_cvm(ano,mes):
url = 'http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_{:4d}{:02d}.csv'.format(ano,mes)
return pd.read_csv(url, sep=';')
today = dt.date.today()
ano = (today.year)
mes= (today.month)-1
file_name_Comp = '{:4d}-{:02d}'.format(ano,mes)
file_name = '{:4d}{:02d}.csv'.format(ano,mes)
path_name = r'C:\Airflow\{:4d}{:02d}.csv'.format(ano,mes)
conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '+ wsl_ip +';Database=CVM;uid=Airflow;pwd=ubuntu')
df = pd.read_sql_query('select max(DT_COMPTC) from Historico;', conn)
left = df[''].str[:7]
if file_name_Comp <= left[0]:
print('Sair')
else:
informes_diarios = busca_informes_cvm(ano,mes)
informes_diarios.to_csv(file_name, sep=';', index=False)
db_view_nm = '[dbo].[Bulk]'
qry = "BULK INSERT " + db_view_nm + " FROM '" + path_name + "' WITH (FIELDTERMINATOR = ';', ROWTERMINATOR = '0x0a', FIRSTROW = 2,ROWS_PER_BATCH = 100000 )"
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
print('Concluído')
execute = run()
etl_dag = my_dag2()
我需要找到一种方法将 wsl_ip 转换为字符串。任何帮助将不胜感激。
要获取“wsl_ip”任务的输出,您可以使用 Airflow 中为每个运算符公开的 .output
属性。此 属性 是对称为 XComArg
的经典 xcom_pull()
方法的抽象(请参阅文档 here)。
您可以尝试这样的操作:
wsl_ip = BashOperator(
task_id="wsl_ip",
bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print }'",
do_xcom_push=True
)
@task()
def run(ip):
...
conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '+ ip +';Database=CVM;uid=Airflow;pwd=ubuntu')
...
execute = run(ip=wsl_ip.output)
run
TaskFlow 函数现在将输入作为从您的 BashOperator
任务推送的 XCom
,该输入应转换为字符串。以这种方式使用 .output
也会自动在“wsl_ip”和“运行”任务之间创建任务依赖关系。
我对 Airflow 和 Python 很陌生。我想做的是从 Bash 命令中获取结果并用 return:
组成连接字符串 import pyodbc as odbc
import pandas as pd
import datetime as dt
from airflow.operators.bash import BashOperator
from airflow.decorators import dag, task
@dag(schedule_interval='0 15 10 * *', start_date=dt.datetime(2021, 10, 1), catchup=False)
def my_dag2():
wsl_ip = BashOperator(
task_id="wsl_ip",
bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print }'",
do_xcom_push=True
)
@task()
def run():
def busca_informes_cvm(ano,mes):
url = 'http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_{:4d}{:02d}.csv'.format(ano,mes)
return pd.read_csv(url, sep=';')
today = dt.date.today()
ano = (today.year)
mes= (today.month)-1
file_name_Comp = '{:4d}-{:02d}'.format(ano,mes)
file_name = '{:4d}{:02d}.csv'.format(ano,mes)
path_name = r'C:\Airflow\{:4d}{:02d}.csv'.format(ano,mes)
conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '+ wsl_ip +';Database=CVM;uid=Airflow;pwd=ubuntu')
df = pd.read_sql_query('select max(DT_COMPTC) from Historico;', conn)
left = df[''].str[:7]
if file_name_Comp <= left[0]:
print('Sair')
else:
informes_diarios = busca_informes_cvm(ano,mes)
informes_diarios.to_csv(file_name, sep=';', index=False)
db_view_nm = '[dbo].[Bulk]'
qry = "BULK INSERT " + db_view_nm + " FROM '" + path_name + "' WITH (FIELDTERMINATOR = ';', ROWTERMINATOR = '0x0a', FIRSTROW = 2,ROWS_PER_BATCH = 100000 )"
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
print('Concluído')
execute = run()
etl_dag = my_dag2()
我需要找到一种方法将 wsl_ip 转换为字符串。任何帮助将不胜感激。
要获取“wsl_ip”任务的输出,您可以使用 Airflow 中为每个运算符公开的 .output
属性。此 属性 是对称为 XComArg
的经典 xcom_pull()
方法的抽象(请参阅文档 here)。
您可以尝试这样的操作:
wsl_ip = BashOperator(
task_id="wsl_ip",
bash_command="grep -m 1 nameserver /etc/resolv.conf | awk '{print }'",
do_xcom_push=True
)
@task()
def run(ip):
...
conn = odbc.connect('Driver={ODBC Driver 17 for SQL Server};Server= '+ ip +';Database=CVM;uid=Airflow;pwd=ubuntu')
...
execute = run(ip=wsl_ip.output)
run
TaskFlow 函数现在将输入作为从您的 BashOperator
任务推送的 XCom
,该输入应转换为字符串。以这种方式使用 .output
也会自动在“wsl_ip”和“运行”任务之间创建任务依赖关系。