气流:无法将数据从 mysql 数据库传输到 csv 文件

Airflow: Not able to transfer data from myql database to csv file

我正在研究 Airflow,我试图将数据从 mysql 数据库传输到 csv 文件。下面是代码和函数

from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator

from dbextract import extract_data
from dbpost_processing import dbpost_process

default_args = {"owner":"airflow","start_date":datetime(2021,7,10)}
with DAG(dag_id="dbworkflow2",default_args=default_args,schedule_interval=None) as dag:
   
    extract = MySqlOperator(
        task_id='extract',
        mysql_conn_id="mysql_db1", 
        sql = extract_data
        )
        
    dbpost_process = PythonOperator(
        task_id = "dbpost_process",
        python_callable = dbpost_process
        )   
        
    extract >> dbpost_process
import pandas as pd
def extract_data():
    df=pd.read_sql('SELECT * FROM new_table', mysql_conn_id)
import pandas as pd
def dbpost_process():
    df.to_csv('~/op_files/sample3.csv', index=False)

在 extract_data 步骤

中出现函数对象不可迭代的错误

MySQL 文件中的 sql 不是可调用的,它应该是字符串或字符串列表。您还试图在任务之间传递 panda 数据框,但它不会工作,因为任务可能(并且很可能会)运行 在不同进程中的不同机器上。

任务之间交换数据的方式通常是通过 XComs(对于少量数据,它应该通过 DB,对于大量数据,您可以添加自定义 XCom 后端并通过例如 S3 或地面站)。

但是在您的情况下,您不需要有两个单独的 tasks/operators。相反,您应该在 Python 运算符中使用 MySQL Hook 来读取数据并在相同的任务中处理它。将该作业拆分为两个单独的任务是没有意义的 - MySQL Operator 实际上是执行 DDL 或 DML 操作,而不是提取数据(正是因为 Airflow Operators 是孤立工作的)

Airflow 取而代之的是 Hooks 的概念,它提供了 API 您可以用来 运行 您的查询并在同一个 Python 可调用操作符中处理数据。甚至最近都可以使用 @task 装饰器来完成,因此它非常简单并且更容易编写 - 特别是如果您习惯于编写函数 Python.

https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html

但在你的情况下,你甚至不需要这样做,因为你想使用 pandas 与 MySQL 数据库通信,所以你甚至不需要使用 Hook。这样的东西就足够了。在这种情况下你不应该使用 conn_id,但你需要在那里传递一个 SQL-Alchemy 兼容的连接字符串。不确定 Airflow 连接 url 是否有效,但如果有效,那么您可以使用 Connection.get_uri()(也许您需要稍微调整一下 URI)。

类似的东西(这是对你的启发,不是可编译的代码,所以你需要弄清楚细节)应该可行:

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def my_dag():
    @task()
    def my_extraction():
         df =  pd.read_sql('SELECT *', Connnection.get("my_connection_id").get_uri()) 
         file = post_process(df)

现在 - 你还必须对该文件做一些事情,因为一旦任务完成,本地数据将不可用(除非你使用 LocalExecutor) - 你可以将 CSV 发送到某个地方(为此你可以使用任何 Hook - 例如 S3Hook)。