在 Airflow 中传递 Python 可调用列表
Passing a list on a Python Callable in Airflow
我有一个 dag,它使用 CSV 文件列表,然后将它们创建到数据框中以供导入。
#CREATING CSV FILES
def csv_filess():
print("Creating CSV files....")
csv_files = []
for file in os.listdir(dataset_dir):
if file.endswith('.csv'):
csv_files.append(file)
print("***Step 3: CSV files created***")
return csv_files
#CREATING DATAFRAME
def create_df(csv_files):
print("Creating dataframe....")
df = {}
for file in csv_files:
try:
df[file] = pd.read_csv(data_path+file)
except UnicodeDecodeError:
df[file] = pd.read_csv(dataset_dir+file, encoding="ISO-8859-1")
print("***Step 4: CSV files created in df!***")
return df
t3 = PythonOperator(
task_id='create_csv',
python_callable=csv_filess, provide_context=True,
dag=dag)
t4 = PythonOperator(
task_id='create_df',
python_callable=create_df,
op_args = t3.output,
provide_context=True,
dag=dag)
但是我得到一个错误:
create_df() takes 1 positional argument but 4 were given
我认为是因为我必须先这样说?:
csv_files = csv_filess()
但是如何在 Airflow 任务上定义它?
从 PythonOperator 返回值会自动将输出存储为键为“return_value”的 XCom。因此,您将从任务 create_csv
中获得一个带有键 return_value
和值 ["file1.csv", "file2.csv", ...]
的 XCom。您可以在 Admin -> XComs 下检查 Airflow 中的所有 XComs,或者通过单击任务 -> Instance Details -> XCom 来检查每个任务。
在您的 create_df
任务中,您随后使用 t3.output
传递 create_csv
的输出。这是对之前创建的 XCom 的引用。当给 op_args
一个列表时,Airflow 会自动解压该列表。所以你必须接受带有 *
的多个参数才能做到这一点:
def create_df(*csv_files):
...
两个注意事项:
您可能有兴趣探索 Airflow's TaskFlow API,这将减少样板代码。您的代码将如下所示:
from airflow.decorators import task
with DAG(...) as dag:
@task
def csv_filess():
...
@task
def create_df(csv_files):
...
create_df(csv_filess())
(注意这里create_df
不需要解包。
最后请注意,PythonOperators 的返回值会自动存储为 XCom(默认情况下存储在 Airflow Metastore 中)。如果配置了 intended/custom XCom 后端,那很好,但是在返回 Pandas DataFrames 时我有点担心,因为这些数据帧可能非常大。
我有一个 dag,它使用 CSV 文件列表,然后将它们创建到数据框中以供导入。
#CREATING CSV FILES
def csv_filess():
print("Creating CSV files....")
csv_files = []
for file in os.listdir(dataset_dir):
if file.endswith('.csv'):
csv_files.append(file)
print("***Step 3: CSV files created***")
return csv_files
#CREATING DATAFRAME
def create_df(csv_files):
print("Creating dataframe....")
df = {}
for file in csv_files:
try:
df[file] = pd.read_csv(data_path+file)
except UnicodeDecodeError:
df[file] = pd.read_csv(dataset_dir+file, encoding="ISO-8859-1")
print("***Step 4: CSV files created in df!***")
return df
t3 = PythonOperator(
task_id='create_csv',
python_callable=csv_filess, provide_context=True,
dag=dag)
t4 = PythonOperator(
task_id='create_df',
python_callable=create_df,
op_args = t3.output,
provide_context=True,
dag=dag)
但是我得到一个错误:
create_df() takes 1 positional argument but 4 were given
我认为是因为我必须先这样说?:
csv_files = csv_filess()
但是如何在 Airflow 任务上定义它?
从 PythonOperator 返回值会自动将输出存储为键为“return_value”的 XCom。因此,您将从任务 create_csv
中获得一个带有键 return_value
和值 ["file1.csv", "file2.csv", ...]
的 XCom。您可以在 Admin -> XComs 下检查 Airflow 中的所有 XComs,或者通过单击任务 -> Instance Details -> XCom 来检查每个任务。
在您的 create_df
任务中,您随后使用 t3.output
传递 create_csv
的输出。这是对之前创建的 XCom 的引用。当给 op_args
一个列表时,Airflow 会自动解压该列表。所以你必须接受带有 *
的多个参数才能做到这一点:
def create_df(*csv_files):
...
两个注意事项:
您可能有兴趣探索 Airflow's TaskFlow API,这将减少样板代码。您的代码将如下所示:
from airflow.decorators import task
with DAG(...) as dag:
@task
def csv_filess():
...
@task
def create_df(csv_files):
...
create_df(csv_filess())
(注意这里create_df
不需要解包。
最后请注意,PythonOperators 的返回值会自动存储为 XCom(默认情况下存储在 Airflow Metastore 中)。如果配置了 intended/custom XCom 后端,那很好,但是在返回 Pandas DataFrames 时我有点担心,因为这些数据帧可能非常大。