如何使用Airflow获取并处理mysql条记录?

How to obtain and process mysql records using Airflow?

我需要

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.

我不确定我应该如何进行。 xcom是去这里的路吗?此外,MYSQLOperator 只执行查询,不获取记录。有没有我可以使用的内置传输操作员?我如何在这里使用 MYSQL 挂钩?

you may want to use a PythonOperator that uses the hook to get the data, apply transformation and ship the (now scored) rows back some other place.

谁能解释一下如何进行处理。

参考-http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)

这是正确的方法吗? 还有我们如何使用 xcoms 存储以下 MySqlOperator 的记录?'

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)

当然,只需创建一个钩子或运算符并调用 get_records() 方法:https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html

在过去的 90 分钟里,我真的为此苦苦挣扎,这里有一个更明确的方式供新手遵循:

from airflow.hooks.mysql_hook import MySqlHook

def fetch_records():
  request = "SELECT * FROM your_table"
  mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
  connection = mysql_hook.get_conn()
  cursor = connection.cursor()
  cursor.execute(request)
  sources = cursor.fetchall()
  print(sources)

...your DAG() as dag: code

task = PythonOperator(
  task_id = 'fetch_records',
  python_callable = fetch_records
)

这 returns 记录您的数据库查询的内容。

我希望这对其他人有用。