使用 S3ToRedshiftOperator 执行 truncate redshift table 时出错

Getting error while doing truncate redshift table using S3ToRedshiftOperator

我想在将 CSV 文件加载到 Redshift table 之前截断我的 Redshift table。

错误: airflow.exceptions.AirflowException:向 S3ToRedshiftOperator 传递了无效参数(task_id:dag_run_s3_to_redshift)。无效的参数是: **kwargs:{'method':'REPLACE'}

下面的代码:

task_fail_s3_to_redshift =  S3ToRedshiftOperator(
        s3_bucket=S3_BUCKET,
        s3_key="{{ti.xcom_pull(task_ids='export_db',key='FILE_PATH_1')}}",
        schema="dw_stage",
        table="task_fail",
        copy_options=['csv',"IGNOREHEADER 1"],
        redshift_conn_id='redshift',
        method='REPLACE',
        task_id='task_fail_s3_to_redshift',
    ) 

 start >> task_fail_s3_to_redshift >> end 

method 参数已添加到 PR 中可用:

apache-airflow-providers-amazon >= 2.4.0

您遇到的错误意味着您使用的是旧版本的亚马逊提供商,这就是它对您不起作用的原因。

您的选择是:

1.Upgrade 提供商

pip install apache-airflow-providers-amazon --upgrade

2.If 升级不是一个选项然后使用已弃用的 truncate_table 参数:

task_fail_s3_to_redshift =  S3ToRedshiftOperator(
        s3_bucket=S3_BUCKET,
        s3_key="{{ti.xcom_pull(task_ids='export_db',key='FILE_PATH_1')}}",
        schema="dw_stage",
        table="task_fail",
        copy_options=['csv',"IGNOREHEADER 1"],
        redshift_conn_id='redshift',
        truncate_table=True,
        task_id='task_fail_s3_to_redshift',
    ) 

因为您需要截断选项 - 它会为您提供相同的功能。