基于文件系统更改触发气流 dag
Triggering an airflow dag based on filesystem changes
我正在尝试编写一个管道,当 postgres 数据库被带到文件夹时,它应该使用 csv 的内容进行更新。我写了一个 dag,它创建 table 并在从网络 UI 触发时推送 csv 内容。这是代码:
from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.',
schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Pushing data in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
create_table >> python_task
当 csv 手动 pasted/brought 到文件夹时,我无法弄清楚如何触发任务。
任何帮助将不胜感激,提前致谢。
听起来您正在寻找文件系统写入事件。
对于较低级别 Linux 查看 inotify:https://pypi.org/project/inotify/
对于也适用于 Mac 或 windows 的更高级别的实现:https://pypi.org/project/watchdog/
我们的想法是添加事件 watchers/event 处理程序以接收 file/directory 修改。该事件将包含新 created/modified 文件的文件路径。
原来 Airflow 有一个特殊的模块来满足这种需求。我使用airflow本身提供的FileSensor解决了这个问题。
根据文档:
FileSensor Waits for a file or folder to land in a filesystem.
If the path given is a directory then this sensor will only return true if
any files exist inside it (either directly, or within a subdirectory)
这是修改后的代码,它等待名为 test.csv 的文件,然后只进行下一个任务当它在airflow文件夹(或任何文件夹,你需要指定路径)中找到文件时:
from datetime import datetime
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Creating table in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
file_sensing_task = FileSensor(task_id='sense_the_csv',
filepath='test.csv',
fs_conn_id='my_file_system',
poke_interval=10)
python_task = PythonOperator(task_id='populate_data', python_callable=my_func)
create_table >> file_sensing_task >> python_task
我正在尝试编写一个管道,当 postgres 数据库被带到文件夹时,它应该使用 csv 的内容进行更新。我写了一个 dag,它创建 table 并在从网络 UI 触发时推送 csv 内容。这是代码:
from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.',
schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Pushing data in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
create_table >> python_task
当 csv 手动 pasted/brought 到文件夹时,我无法弄清楚如何触发任务。 任何帮助将不胜感激,提前致谢。
听起来您正在寻找文件系统写入事件。
对于较低级别 Linux 查看 inotify:https://pypi.org/project/inotify/
对于也适用于 Mac 或 windows 的更高级别的实现:https://pypi.org/project/watchdog/
我们的想法是添加事件 watchers/event 处理程序以接收 file/directory 修改。该事件将包含新 created/modified 文件的文件路径。
原来 Airflow 有一个特殊的模块来满足这种需求。我使用airflow本身提供的FileSensor解决了这个问题。
根据文档:
FileSensor Waits for a file or folder to land in a filesystem. If the path given is a directory then this sensor will only return true if any files exist inside it (either directly, or within a subdirectory)
这是修改后的代码,它等待名为 test.csv 的文件,然后只进行下一个任务当它在airflow文件夹(或任何文件夹,你需要指定路径)中找到文件时:
from datetime import datetime
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Creating table in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
file_sensing_task = FileSensor(task_id='sense_the_csv',
filepath='test.csv',
fs_conn_id='my_file_system',
poke_interval=10)
python_task = PythonOperator(task_id='populate_data', python_callable=my_func)
create_table >> file_sensing_task >> python_task