将 python 脚本转换为气流 dag

Convert python script to airflow dag

我发现以下脚本对任何人都非常有用 运行使用 Amazon Redshift:

#!/usr/bin/env python
from __future__ import print_function

'''
analyze-vacuum-schema.py
* Copyright 2015, Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
The Redshift Analyze Vacuum Utility gives you the ability to automate VACUUM and ANALYZE operations.
When run, it will analyze or vacuum an entire schema or individual tables. This Utility Analyzes
and Vacuums table(s) in a Redshift Database schema, based on certain parameters like unsorted,
stats off and size of the table and system alerts from stl_explain & stl_alert_event_log.
By turning on/off '--analyze-flag' and  '--vacuum-flag' parameters, you can run it as  'vacuum-only'
or  'analyze-only' utility. This script can be scheduled to run VACUUM and ANALYZE as part of
regular maintenance/housekeeping activities, when there are less database activities (quiet period).
This script will:
   1) Analyze a single table or tables in a schema based on,
        a) Alerts from stl_explain & stl_alert_event_log.
        b) 'stats off' metrics from SVV_TABLE_INFO.
   2) Vacuum a single table or tables in a schema based on,
        a) The alerts from stl_alert_event_log.
        b) The 'unsorted' and 'size' metrics from SVV_TABLE_INFO.
        c) Vacuum reindex to analyze the interleaved sort keys
Srinikri Amazon Web Services (2015)
11/21/2015 : Added support for vacuum reindex to analyze the interleaved sort keys.
09/01/2017 : Fixed issues with interleaved sort key tables per https://github.com/awslabs/amazon-redshift-utils/issues/184
11/09/2017 : Refactored to support running in AWS Lambda
14/12/2017 : Refactored to support a more sensible interface style with kwargs
'''
import os
import sys
import argparse

# add the lib directory to the sys path
try:
    sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
    sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
except:
    pass

import getopt
import analyze_vacuum
import config_constants

__version__ = ".9.2.1"

OK = 0
ERROR = 1
INVALID_ARGS = 2
NO_WORK = 3
TERMINATED_BY_USER = 4
NO_CONNECTION = 5

# setup cli args
parser = argparse.ArgumentParser()
parser.add_argument("--analyze-flag", dest="analyze_flag", default=True, type=bool,
                    help="Flag to turn ON/OFF ANALYZE functionality (True or False : Default = True ")
parser.add_argument("--max-unsorted-pct", dest="max_unsorted_pct",
                    help="Maximum unsorted percentage(% to consider a table for vacuum : Default = 50%")
parser.add_argument("--min-interleaved-cnt", dest="min_interleaved_cnt", type=int,
                    help="Minimum stv_interleaved_counts records to consider a table for vacuum reindex: Default = 0")
parser.add_argument("--min-interleaved-skew", dest="min_interleaved_skew",
                    help="Minimum index skew to consider a table for vacuum reindex: Default = 1.4")
parser.add_argument("--min-unsorted-pct", dest="min_unsorted_pct",
                    help="Minimum unsorted percentage(% to consider a table for vacuum : Default = 5%")
parser.add_argument("--stats-off-pct ", dest="stats_off_pct",
                    help="Minimum stats off percentage(% to consider a table for analyze : Default = 10%")
parser.add_argument("--table-name", dest="table_name",
                    help="A specific table to be Analyzed or Vacuumed if analyze-schema is not desired")
parser.add_argument("--vacuum-flag", dest="vacuum_flag", default=True, type=bool,
                    help="Flag to turn ON/OFF VACUUM functionality (True or False :  Default = True")
parser.add_argument("--vacuum-parameter", dest="vacuum_parameter",
                    help="Vacuum parameters [ FULL | SORT ONLY | DELETE ONLY | REINDEX ] Default = FULL")
parser.add_argument("--blacklisted-tables", dest="blacklisted_tables", help="The tables we do not want to Vacuum")
parser.add_argument("--db-conn-opts", dest="db_conn_opts",
                    help="Additional connection options. name1=opt1[ name2=opt2]..")
parser.add_argument("--db-host", dest="db_host", required=True, help="The Cluster endpoint")
parser.add_argument("--db-port", dest="db_port", type=int, required=True,
                    help="The Cluster endpoint port : Default = 5439")
parser.add_argument("--db-pwd", dest="db_pwd", help="The Password for the Database User to connect to")
parser.add_argument("--db-user", dest="db_user", required=True, help="The Database User to connect to")
parser.add_argument("--debug ", dest="debug", default=False,
                    help="Generate Debug Output including SQL Statements being run")
parser.add_argument("--ignore-errors", dest="ignore_errors", default=True,
                    help="Ignore errors raised when running and continue processing")
parser.add_argument("--max-table-size-mb", dest="max_table_size_mb", type=int,
                    help="Maximum table size in MB : Default = 700*1024 MB")
parser.add_argument("--output-file", dest="output_file", help="The full path to the output file to be generated")
parser.add_argument("--predicate-cols", dest="predicate_cols", help="Analyze predicate columns only")
parser.add_argument("--query-group", dest="query_group", help="Set the query_group for all queries")
parser.add_argument("--require-ssl", dest="require_ssl", default=False,
                    help="Does the connection require SSL? (True | False")
parser.add_argument("--schema-name", dest="schema_name",
                    help="The Schema to be Analyzed or Vacuumed (REGEX: Default = public")
parser.add_argument("--slot-count", dest="slot_count", help="Modify the wlm_query_slot_count : Default = 1")
parser.add_argument("--suppress-cloudwatch", dest="suppress_cw",
                    help="Don't emit CloudWatch metrics for analyze or vacuum when set to True")
parser.add_argument("--db", dest="db", help="The Database to Use")
full_args = parser.parse_args()
parse_args = {}
# remove args that end up as None
for k, v in vars(full_args).items():
    if v is not None:
        parse_args[k] = v


def main(argv):
    # get environmental args
    args = {config_constants.DB_NAME: os.environ.get('PGDATABASE', None),
            config_constants.DB_USER: os.environ.get('PGUSER', None),
            config_constants.DB_PASSWORD: os.environ.get('PGPASSWORD', None),
            config_constants.DB_HOST: os.environ.get('PGHOST', None),
            config_constants.DB_PORT: os.environ.get('PGPORT', 5439)}

    # add argparse args
    args.update(parse_args)

    if args.get(config_constants.OUTPUT_FILE) is not None:
        sys.stdout = open(args.get(config_constants.OUTPUT_FILE), 'w')

    # invoke the main method of the utility
    result = analyze_vacuum.run_analyze_vacuum(**args)

    if result is not None:
        sys.exit(result)
    else:
        sys.exit(0)


if __name__ == "__main__":
    main(sys.argv)

但是,您只能通过 EC2 或类似调度程序中的 cron“轻松”安排它。所以,我一直在努力寻找一种方法来 运行 它作为气流 dag。

我在 Whosebug 上发现了两个类似的问题,我想它可能缺少一些导入命令?如下所示:

from airflow import DAG
from airflow.models import Variable

我希望找到足够熟悉气流的人能够帮助我找到所需的条目或指出正确的方向?

如果是这样,我可以从原始脚本创建一个分支:

https://github.com/awslabs/amazon-redshift-utils/blob/master/src/AnalyzeVacuumUtility/analyze-vacuum-schema.py

这将在未来帮助其他人实现相同的目标。

创建一个新的 custom operator 怎么样?它应该接受所有 cli 参数,然后您可以将它们传递给现有脚本中的代码。这是我会做的一些粗略草稿:

from airflow.models import BaseOperator

class AnalyzeVacuumOperator(BaseOperator):
  def __init__(
    self, 
    *,
    analyze_flag: bool = True,
    max_unsorted_pct: float = None,
    # ... here goes all other arguments from argparse
    **kwargs
  ):
    super().__init__(**kwargs)
    self.args = {
       config_constants.DB_NAME: os.environ.get('PGDATABASE', None),
       # Here goes all the rest of args defined in original script
       # ...
       # Then we add new arguments configured by users
       # which are now passed using cli (argparse).
       # TODO: check what are current key names so we match with them
       analyze_flag: analyze_flag,
       max_unsorted_pct: max_unsorted_pct,
       # ...
    }
  
  def execute(context):
    args = self.args

    # Here we copy paste from the existing script

    if args.get(config_constants.OUTPUT_FILE) is not None:
        sys.stdout = open(args.get(config_constants.OUTPUT_FILE), 'w')

    # invoke the main method of the utility
    result = analyze_vacuum.run_analyze_vacuum(**args)

    if result is not None:
        sys.exit(result)
    else:
        sys.exit(0)

那么你可以在你的 DAG 中使用这样的运算符:

with DAG(...) as dag:
  AnalyzeVacuumOperator(
    task_id="vacuum_task", 
    analyze_flag=True, 
    slot_count=3,
  )

请注意,您可能需要像在原始脚本中一样调整导入和路径更改。