Dag 示例 pet 未能插入和重生
Dag example pet failed to insert and getbirth
我是 airflow
的新手。我根据官方文档设置了一切。我使用 pet example DAG
,但是当我查看 DAG
日志时,它显示以下错误:
第一个错误 来自 populate_pet_table
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type integer: "Maxy"
LINE 2: INSERT INTO pet VALUES ('Maxy', 'Dog', '2018-07-...
第二个错误 来自 get_birth_date
BETWEEN SYMMETRIC 2020-01-01 AND 2020-12-31;
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
有什么问题吗?这确实是官方示例,所以对我来说应该可以正常工作。这是 dag 的代码:
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule_interval="@once",
catchup=False,
) as dag:
create_pet_table = PostgresOperator(
task_id="create_pet_table",
postgres_conn_id="postgres_default",
sql="""
CREATE TABLE IF NOT EXISTS pet (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
populate_pet_table = PostgresOperator(
task_id="populate_pet_table",
postgres_conn_id="postgres_default",
sql="""
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
""",
)
get_all_pets = PostgresOperator(
task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
)
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="""
SELECT * FROM pet
WHERE birth_date
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
""",
params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
)
create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
SQL 本身似乎有错误。
变化:
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
收件人:
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
第二期需要改
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
至:
BETWEEN SYMMETRIC DATE '{{ params.begin_date }}' AND DATE '{{ params.end_date }}';
它会正常工作。
来自 sqlfiddle
的示例
我提出了 PR 来解决这个问题。
我是 airflow
的新手。我根据官方文档设置了一切。我使用 pet example DAG
,但是当我查看 DAG
日志时,它显示以下错误:
第一个错误 来自 populate_pet_table
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type integer: "Maxy"
LINE 2: INSERT INTO pet VALUES ('Maxy', 'Dog', '2018-07-...
第二个错误 来自 get_birth_date
BETWEEN SYMMETRIC 2020-01-01 AND 2020-12-31;
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
有什么问题吗?这确实是官方示例,所以对我来说应该可以正常工作。这是 dag 的代码:
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule_interval="@once",
catchup=False,
) as dag:
create_pet_table = PostgresOperator(
task_id="create_pet_table",
postgres_conn_id="postgres_default",
sql="""
CREATE TABLE IF NOT EXISTS pet (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
populate_pet_table = PostgresOperator(
task_id="populate_pet_table",
postgres_conn_id="postgres_default",
sql="""
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
""",
)
get_all_pets = PostgresOperator(
task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
)
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="""
SELECT * FROM pet
WHERE birth_date
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
""",
params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
)
create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
SQL 本身似乎有错误。
变化:
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
收件人:
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
第二期需要改
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
至:
BETWEEN SYMMETRIC DATE '{{ params.begin_date }}' AND DATE '{{ params.end_date }}';
它会正常工作。 来自 sqlfiddle
的示例我提出了 PR 来解决这个问题。