如何在 MySqlOperator 中使用 airflow xcoms
How to use airflow xcoms with MySqlOperator
def mysql_operator_test():
DEFAULT_DATE = datetime(2017, 10, 9)
t = MySqlOperator(
task_id='basic_mysql',
sql="SELECT count(*) from table 1 where id>100;",
mysql_conn_id='mysql_default',
dag=dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False)
run_this = PythonOperator(
task_id='getRecoReq',
python_callable=mysql_operator_test,
# xcom_push=True,
dag=dag)
task2 = PythonOperator(
task_id= 'mysql_select',
provide_context=True,
python_callable = blah,
templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" },
dag=dag)
run_this.set_downstream(task2)
我想使用 xcoms 捕获 MySqlOperator 返回的计数。有人可以就此给予指导吗?
你非常接近!但是,您问这个问题的方式有点反模式。您不想在 Airflow 中跨任务共享数据。此外,您不想像在 mysql_operator_test
中那样使用运算符。很诱人,我刚开始的时候也做过同样的事情。
我尝试了与此非常相似的方法,但使用的是 SFTP 连接。我最终只是在 PythonOperator
中做了所有事情并使用了底层的钩子。
我建议您在 python_callable
中使用 MySQLHook
。像这样:
def count_mysql_and_then_use_the_count():
"""
Returns an SFTP connection created using the SSHHook
"""
mysql_hook = MySQLHook(...)
cur = conn.cursor()
cur.execute("""SELECT count(*) from table 1 where id>100""")
for count in cur:
# Do something with the count...
我不确定这是否会按原样工作,但我的想法是 在你的 Python 可调用对象中使用一个钩子,我不使用 MySQLHook
经常,但我用 SSHHook
做到了,而且效果很好。
def mysql_operator_test():
DEFAULT_DATE = datetime(2017, 10, 9)
t = MySqlOperator(
task_id='basic_mysql',
sql="SELECT count(*) from table 1 where id>100;",
mysql_conn_id='mysql_default',
dag=dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False)
run_this = PythonOperator(
task_id='getRecoReq',
python_callable=mysql_operator_test,
# xcom_push=True,
dag=dag)
task2 = PythonOperator(
task_id= 'mysql_select',
provide_context=True,
python_callable = blah,
templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" },
dag=dag)
run_this.set_downstream(task2)
我想使用 xcoms 捕获 MySqlOperator 返回的计数。有人可以就此给予指导吗?
你非常接近!但是,您问这个问题的方式有点反模式。您不想在 Airflow 中跨任务共享数据。此外,您不想像在 mysql_operator_test
中那样使用运算符。很诱人,我刚开始的时候也做过同样的事情。
我尝试了与此非常相似的方法,但使用的是 SFTP 连接。我最终只是在 PythonOperator
中做了所有事情并使用了底层的钩子。
我建议您在 python_callable
中使用 MySQLHook
。像这样:
def count_mysql_and_then_use_the_count():
"""
Returns an SFTP connection created using the SSHHook
"""
mysql_hook = MySQLHook(...)
cur = conn.cursor()
cur.execute("""SELECT count(*) from table 1 where id>100""")
for count in cur:
# Do something with the count...
我不确定这是否会按原样工作,但我的想法是 在你的 Python 可调用对象中使用一个钩子,我不使用 MySQLHook
经常,但我用 SSHHook
做到了,而且效果很好。