在卸载过程中从 Redshift 获取行数并计算 S3 中加载的行数

Getting row counts from Redshift during unload process and counting rows loaded in S3

我的 python 代码如下所示,我将数据从 Redshift 卸载到 Amazon S3 存储桶。我正在尝试从 RedshiftS3 bucket 获取行数以确保加载所有数据。此外,我还想从 S3 存储桶中获取上次上传日期,以便我知道上次执行卸载的时间。请建议带有解释的代码。

提前感谢您的时间和努力!

import csv
import redshift_connector
import sys

CSV_FILE="Tables.csv"
CSV_DELIMITER=';'
S3_DEST_PATH="s3://..../"
DB_HOST="MY HOST"
DB_PORT=1234
DB_DB="MYDB"
DB_USER="MY_READ"
DB_PASSWORD="MY_PSWD"
IM_ROLE="arn:aws:iam::/redshift-role/unload data","arn:aws::iam::/write in bucket"

def get_tables(path):
    tables=[]
    with open (path, 'r') as file:
        csv_reader = csv.reader (file,delimiter=CSV_DELIMITER)
        header = next(csv_reader)
        if header != None:
            for row in csv_reader:
                tables.append(row)
    return tables

def unload(conn, tables, s3_path):
    cur = conn.cursor()
    
    for table in tables:
        print(f">{table[0]}.{table[1]}")
        try:
            query= f'''unload('select * from {table[0]}.{table[1]}' to '{s3_path}/{table[1]}/'
            iam_role '{IAM_ROLE}'
            CSV
            PARALLEL FALSE
            CLEANPATH;'''
            print(f"loading in progress")
            cur.execute(query)
            print(f"Done.")
        except Esception as e:
            print("Failed to load")
            print(str(e))
            sys.exit(1)
            
        cur.close()
        

def main():
    try:
        conn = redshift_connector.connect(
        host=DB_HOST,
        port=DB_PORT,
        database= DB_DB,
        user= DB_USER,
        password=DB_PASSWORD
        )
        
        tables = get_tables(CSV_FILE)
        unload(conn,tables,S3_DEST_PATH)
        conn.close()
    except Exception as e:
        print(e)
        sys.exit(1)

根据 SO 用户的评论更新代码

tables=['schema1.tablename','schema2.table2']

conn=redshift_connector.connect(
    host='my_host',
    port= "my_port",
database='my_db'
user="user"
password='password')

cur=conn.cursor()
cur.execute ('select count(*) from {',' .join("'"+y+"'" for y in tables)}')
results=cur.fetchall()
print("The table {} contained".format(tables[0]),*result[0],"rows"+"\n" ) #Printing row counts along with table names

cur.close()
conn.close()

第二次更新:

表=['schema1.tablename','schema2.table2']

conn=redshift_connector.connect(
    host='my_host',
    port= "my_port",
database='my_db'
user="user"
password='password')

cur=conn.cursor()
for table in tables:
        cur.execute(f'select count(*) from {table};')
       results=cur.fetchone()
      for row in result:
            print("The table {} contained".format(tables[0]),result[0],"rows"+"\n" ) #Printing row counts along with table names

获取行数的简单查询是

query = "select count(*) from {table_name}"

对于 Redshift,您需要做的就是

cur.execute(query)
row_count = cur.fetchall()

使用 boto3,您也可以使用类似的 SQL 查询来获取 S3 行数,如 答案中所述。

编辑:
稍微更正了您的更新方法:

cur=conn.cursor()
for table in tables:
    cur.execute(f'select count(*) from {table};')
    result=cur.fetchone()
    count = result[0] if result else 0
    print(f"The table {table} contained {count} rows.\n" )