Dag 示例 pet 未能插入和重生

Dag example pet failed to insert and getbirth

我是 airflow 的新手。我根据官方文档设置了一切。我使用 pet example DAG,但是当我查看 DAG 日志时,它显示以下错误:

第一个错误 来自 populate_pet_table

psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type integer: "Maxy"
LINE 2:             INSERT INTO pet VALUES ('Maxy', 'Dog', '2018-07-...

第二个错误 来自 get_birth_date

BETWEEN SYMMETRIC 2020-01-01 AND 2020-12-31;


HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.

有什么问题吗?这确实是官方示例,所以对我来说应该可以正常工作。这是 dag 的代码:

import datetime

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator

with DAG(
    dag_id="postgres_operator_dag",
    start_date=datetime.datetime(2020, 2, 2),
    schedule_interval="@once",
    catchup=False,
) as dag:
    create_pet_table = PostgresOperator(
        task_id="create_pet_table",
        postgres_conn_id="postgres_default",
        sql="""
            CREATE TABLE IF NOT EXISTS pet (
            pet_id SERIAL PRIMARY KEY,
            name VARCHAR NOT NULL,
            pet_type VARCHAR NOT NULL,
            birth_date DATE NOT NULL,
            OWNER VARCHAR NOT NULL);
          """,
    )
    populate_pet_table = PostgresOperator(
        task_id="populate_pet_table",
        postgres_conn_id="postgres_default",
        sql="""
            INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
            INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
            INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
            INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
            """,
    )
    get_all_pets = PostgresOperator(
        task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
    )
    get_birth_date = PostgresOperator(
        task_id="get_birth_date",
        postgres_conn_id="postgres_default",
        sql="""
            SELECT * FROM pet
            WHERE birth_date
            BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
            """,
        params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
    )

    create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date

SQL 本身似乎有错误。

变化:

INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');

收件人:

INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');

第二期需要改

BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};

至:

BETWEEN SYMMETRIC DATE '{{ params.begin_date }}' AND DATE '{{ params.end_date }}';

它会正常工作。 来自 sqlfiddle

的示例

我提出了 PR 来解决这个问题。