如何使用Airflow获取并处理mysql条记录?
How to obtain and process mysql records using Airflow?
我需要
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
我不确定我应该如何进行。 xcom是去这里的路吗?此外,MYSQLOperator 只执行查询,不获取记录。有没有我可以使用的内置传输操作员?我如何在这里使用 MYSQL 挂钩?
you may want to use a PythonOperator that uses the hook to get the data,
apply transformation and ship the (now scored) rows back some other place.
谁能解释一下如何进行处理。
参考-http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
这是正确的方法吗?
还有我们如何使用 xcoms 存储以下 MySqlOperator 的记录?'
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
当然,只需创建一个钩子或运算符并调用 get_records() 方法:https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html
在过去的 90 分钟里,我真的为此苦苦挣扎,这里有一个更明确的方式供新手遵循:
from airflow.hooks.mysql_hook import MySqlHook
def fetch_records():
request = "SELECT * FROM your_table"
mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
sources = cursor.fetchall()
print(sources)
...your DAG() as dag: code
task = PythonOperator(
task_id = 'fetch_records',
python_callable = fetch_records
)
这 returns 记录您的数据库查询的内容。
我希望这对其他人有用。
我需要
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
我不确定我应该如何进行。 xcom是去这里的路吗?此外,MYSQLOperator 只执行查询,不获取记录。有没有我可以使用的内置传输操作员?我如何在这里使用 MYSQL 挂钩?
you may want to use a PythonOperator that uses the hook to get the data, apply transformation and ship the (now scored) rows back some other place.
谁能解释一下如何进行处理。
参考-http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
这是正确的方法吗? 还有我们如何使用 xcoms 存储以下 MySqlOperator 的记录?'
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
当然,只需创建一个钩子或运算符并调用 get_records() 方法:https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html
在过去的 90 分钟里,我真的为此苦苦挣扎,这里有一个更明确的方式供新手遵循:
from airflow.hooks.mysql_hook import MySqlHook
def fetch_records():
request = "SELECT * FROM your_table"
mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
sources = cursor.fetchall()
print(sources)
...your DAG() as dag: code
task = PythonOperator(
task_id = 'fetch_records',
python_callable = fetch_records
)
这 returns 记录您的数据库查询的内容。
我希望这对其他人有用。