如何将 Python operated dag 转换为 PostgreSQL operated dag?
How to convert Python operated dag in PostgreSQL operated dag?
我编写了 airflow dag 以与 Python operator.I 一起使用 operator.I 需要在不更改 dag 功能的情况下对同一 dag 使用 PostgreSQL 运算符。这是带有 Python 运算符的代码。我应该如何用 PostgreSQL 运算符替换 Python 运算符?或者我们可以在一个 dag 中使用两个不同的运算符吗?
from airflow import DAG
from airflow.models import dag
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
import os
script_dir_path = os.path.dirname(os.path.realpath(__file__))
import time
from time import sleep
from xlwt import Workbook
import pandas as pd
from csv import writer
from csv import DictReader
from datetime import datetime
from selenium import webdriver
import psycopg2
opt = webdriver.FirefoxOptions()
wb=Workbook()
sheet1=wb.add_sheet('Sheet 1',cell_overwrite_ok=False)
i=0
j=0
default_args = {
'owner': 'airflow',
'retries': 1
}
dag = DAG( 'Yahoo_Finance',
default_args=default_args,
description='fetching ticker symbol',
catchup=False,
start_date= datetime.now(),
schedule_interval= '* 7 * * *'
)
def extract_tickers():
conn = psycopg2.connect(dbname='postgres', user='airflow', password='airflow', host='postgres')
cur = conn.cursor()
with open(r'./fromlocal/EQUITY_L.csv') as read_obj:
csv_dict_reader = DictReader(read_obj)
url = "https://finance.yahoo.com"
driver = webdriver.Remote("http://selenium:4444/wd/hub", options=opt)
driver.get(url)
for row in csv_dict_reader:
time.sleep(4)
# action = ActionChains(driver)
time.sleep(4)
searchBox = driver.find_element_by_id('yfin-usr-qry')
time.sleep(4)
searchBox.send_keys(row['SYMBOL'])
time.sleep(4)
# clicking on search
driver.find_element_by_xpath('//*[@id="header-desktop-search-button"]').click()
time.sleep(15)
companyname = driver.find_elements_by_xpath('//*[@id="quote-header-info"]/div[2]/div[1]/div[1]/h1')
ticker = companyname = str(companyname[0].text)
print("comapny name: "+ companyname)
ticker = ticker[::-1]
ticker = ticker[1:ticker.find("(")]
ticker = ticker[::-1]
print("extracted ticker: " + ticker)
companyname = companyname[:companyname.find(" (")]
companyname = companyname.replace("'","''")
cur.execute("INSERT INTO tickers1 (keyword,companyName) values ('" + ticker + "','" + companyname + "')")
conn.commit()
cur.close()
conn.close()
print(script_dir_path)
Yahoo_Finance = PythonOperator(task_id = 'extract_tickers',
python_callable = extract_tickers,
provide_context = True,
dag= dag )
Yahoo_Finance
PostgesOperator
运行 SQL。您的代码正在查询 API,生成 CSV 并将其加载到数据库。你不能用 PostgesOperator
.
做到这一点
你可以做的是用PostgresHook
替换psycopg2
的用法。
钩子是 psycopg2
的包装器,它向您公开可以与之交互的函数。这意味着,例如,您不需要自己处理如何连接到 Postgres。只需在 Admin -> Connections
中定义连接并在钩子中引用连接名称:
from airflow.providers.postgres.hooks.postgres import PostgresHook
def extract_tickers():
with PostgresHook(postgres_conn_id="postgres_default").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("Your SQL CODE")
要查看钩子中可用的其他方法,请检查钩子 source code。
我编写了 airflow dag 以与 Python operator.I 一起使用 operator.I 需要在不更改 dag 功能的情况下对同一 dag 使用 PostgreSQL 运算符。这是带有 Python 运算符的代码。我应该如何用 PostgreSQL 运算符替换 Python 运算符?或者我们可以在一个 dag 中使用两个不同的运算符吗?
from airflow import DAG
from airflow.models import dag
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
import os
script_dir_path = os.path.dirname(os.path.realpath(__file__))
import time
from time import sleep
from xlwt import Workbook
import pandas as pd
from csv import writer
from csv import DictReader
from datetime import datetime
from selenium import webdriver
import psycopg2
opt = webdriver.FirefoxOptions()
wb=Workbook()
sheet1=wb.add_sheet('Sheet 1',cell_overwrite_ok=False)
i=0
j=0
default_args = {
'owner': 'airflow',
'retries': 1
}
dag = DAG( 'Yahoo_Finance',
default_args=default_args,
description='fetching ticker symbol',
catchup=False,
start_date= datetime.now(),
schedule_interval= '* 7 * * *'
)
def extract_tickers():
conn = psycopg2.connect(dbname='postgres', user='airflow', password='airflow', host='postgres')
cur = conn.cursor()
with open(r'./fromlocal/EQUITY_L.csv') as read_obj:
csv_dict_reader = DictReader(read_obj)
url = "https://finance.yahoo.com"
driver = webdriver.Remote("http://selenium:4444/wd/hub", options=opt)
driver.get(url)
for row in csv_dict_reader:
time.sleep(4)
# action = ActionChains(driver)
time.sleep(4)
searchBox = driver.find_element_by_id('yfin-usr-qry')
time.sleep(4)
searchBox.send_keys(row['SYMBOL'])
time.sleep(4)
# clicking on search
driver.find_element_by_xpath('//*[@id="header-desktop-search-button"]').click()
time.sleep(15)
companyname = driver.find_elements_by_xpath('//*[@id="quote-header-info"]/div[2]/div[1]/div[1]/h1')
ticker = companyname = str(companyname[0].text)
print("comapny name: "+ companyname)
ticker = ticker[::-1]
ticker = ticker[1:ticker.find("(")]
ticker = ticker[::-1]
print("extracted ticker: " + ticker)
companyname = companyname[:companyname.find(" (")]
companyname = companyname.replace("'","''")
cur.execute("INSERT INTO tickers1 (keyword,companyName) values ('" + ticker + "','" + companyname + "')")
conn.commit()
cur.close()
conn.close()
print(script_dir_path)
Yahoo_Finance = PythonOperator(task_id = 'extract_tickers',
python_callable = extract_tickers,
provide_context = True,
dag= dag )
Yahoo_Finance
PostgesOperator
运行 SQL。您的代码正在查询 API,生成 CSV 并将其加载到数据库。你不能用 PostgesOperator
.
你可以做的是用PostgresHook
替换psycopg2
的用法。
钩子是 psycopg2
的包装器,它向您公开可以与之交互的函数。这意味着,例如,您不需要自己处理如何连接到 Postgres。只需在 Admin -> Connections
中定义连接并在钩子中引用连接名称:
from airflow.providers.postgres.hooks.postgres import PostgresHook
def extract_tickers():
with PostgresHook(postgres_conn_id="postgres_default").get_conn() as conn:
with conn.cursor() as cur:
cur.execute("Your SQL CODE")
要查看钩子中可用的其他方法,请检查钩子 source code。