尝试使用 RedshiftToS3Operator 将 SQL 查询的输出从 Redshift 存储到 S3 时出现气流异常

Airflow Exception while trying to store output of a SQL query from Redshift to S3 using RedshiftToS3Operator

我已经在 Airflow 中创建了一个 redshift 连接,如屏幕截图所示。 Post 我将 DAG 中的 RedshiftToS3Operator 导入到 运行 redshift 查询并将 csv 存储在 s3 中。

from datetime import timedelta, datetime
import pytz
import airflow  
from airflow import DAG  
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.email import EmailOperator
from airflow.models import Variable
from airflow.hooks.base import BaseHook

redshift_conn = BaseHook.get_connection('redshift_qa')

env_name = Variable.get("deploy_environment")

default_args = {  
    'owner': 'Abhra',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ["john@email.com"],
    'email_on_failure': True,
    'email_on_retry': True
}

dag = DAG(  
    'TS-Redshift-to-S3',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=4),
    schedule_interval='0 9 * * *'
)

begin_DAG = DummyOperator(task_id='begin_DAG', dag=dag)
stop_DAG = DummyOperator(task_id='stop_DAG', dag=dag)
S3_BUCKET_NAME = "test-bucket"
S3_KEY = "redshift-to-s3.csv"
QUERY = "select * from public.sample_table"

redshift_s3_operator = RedshiftToS3Operator(
    task_id='transfer_redshift_to_s3',
    s3_bucket=S3_BUCKET_NAME,
    s3_key=S3_KEY,
    schema='public',
    select_query = QUERY,
    conn_id = redshift_conn,
    dag=dag
)


begin_DAG >> redshift_s3_operator >> stop_DAG

我在 Airflow 网络服务器中收到以下错误: *airflow.exceptions.AirflowException:向 RedshiftToS3Operator 传递了无效参数(task_id:transfer_redshift_to_s3)。无效的参数是: *kwargs: {'conn_id': 'redshift_qa'}

我想做的就是能够将我在 Airflow UI 中创建的连接导入到这个 DAG 中。我调查了 this 但我正在尝试使用 Amazon Web Services 作为 Conn 类型。我需要在这个 dag 中做哪些更改?

Screenshot : Redshift Connection added in Airflow

RedshiftToS3Operator 没有参数 conn_id。您正在寻找 redshift_conn_id.

此外,提供连接的名称,而不是连接对象。

redshift_s3_operator = RedshiftToS3Operator(
    task_id='transfer_redshift_to_s3',
    s3_bucket=S3_BUCKET_NAME,
    s3_key=S3_KEY,
    schema='public',
    select_query=QUERY,
    redshift_conn_id='redshift_qa',
    dag=dag
)