在卸载过程中从 Redshift 获取行数并计算 S3 中加载的行数
Getting row counts from Redshift during unload process and counting rows loaded in S3
我的 python
代码如下所示,我将数据从 Redshift
卸载到 Amazon S3
存储桶。我正在尝试从 Redshift
和 S3 bucket
获取行数以确保加载所有数据。此外,我还想从 S3 存储桶中获取上次上传日期,以便我知道上次执行卸载的时间。请建议带有解释的代码。
提前感谢您的时间和努力!
import csv
import redshift_connector
import sys
CSV_FILE="Tables.csv"
CSV_DELIMITER=';'
S3_DEST_PATH="s3://..../"
DB_HOST="MY HOST"
DB_PORT=1234
DB_DB="MYDB"
DB_USER="MY_READ"
DB_PASSWORD="MY_PSWD"
IM_ROLE="arn:aws:iam::/redshift-role/unload data","arn:aws::iam::/write in bucket"
def get_tables(path):
tables=[]
with open (path, 'r') as file:
csv_reader = csv.reader (file,delimiter=CSV_DELIMITER)
header = next(csv_reader)
if header != None:
for row in csv_reader:
tables.append(row)
return tables
def unload(conn, tables, s3_path):
cur = conn.cursor()
for table in tables:
print(f">{table[0]}.{table[1]}")
try:
query= f'''unload('select * from {table[0]}.{table[1]}' to '{s3_path}/{table[1]}/'
iam_role '{IAM_ROLE}'
CSV
PARALLEL FALSE
CLEANPATH;'''
print(f"loading in progress")
cur.execute(query)
print(f"Done.")
except Esception as e:
print("Failed to load")
print(str(e))
sys.exit(1)
cur.close()
def main():
try:
conn = redshift_connector.connect(
host=DB_HOST,
port=DB_PORT,
database= DB_DB,
user= DB_USER,
password=DB_PASSWORD
)
tables = get_tables(CSV_FILE)
unload(conn,tables,S3_DEST_PATH)
conn.close()
except Exception as e:
print(e)
sys.exit(1)
根据 SO 用户的评论更新代码
tables=['schema1.tablename','schema2.table2']
conn=redshift_connector.connect(
host='my_host',
port= "my_port",
database='my_db'
user="user"
password='password')
cur=conn.cursor()
cur.execute ('select count(*) from {',' .join("'"+y+"'" for y in tables)}')
results=cur.fetchall()
print("The table {} contained".format(tables[0]),*result[0],"rows"+"\n" ) #Printing row counts along with table names
cur.close()
conn.close()
第二次更新:
表=['schema1.tablename','schema2.table2']
conn=redshift_connector.connect(
host='my_host',
port= "my_port",
database='my_db'
user="user"
password='password')
cur=conn.cursor()
for table in tables:
cur.execute(f'select count(*) from {table};')
results=cur.fetchone()
for row in result:
print("The table {} contained".format(tables[0]),result[0],"rows"+"\n" ) #Printing row counts along with table names
获取行数的简单查询是
query = "select count(*) from {table_name}"
对于 Redshift,您需要做的就是
cur.execute(query)
row_count = cur.fetchall()
使用 boto3
,您也可以使用类似的 SQL 查询来获取 S3 行数,如 答案中所述。
编辑:
稍微更正了您的更新方法:
cur=conn.cursor()
for table in tables:
cur.execute(f'select count(*) from {table};')
result=cur.fetchone()
count = result[0] if result else 0
print(f"The table {table} contained {count} rows.\n" )
我的 python
代码如下所示,我将数据从 Redshift
卸载到 Amazon S3
存储桶。我正在尝试从 Redshift
和 S3 bucket
获取行数以确保加载所有数据。此外,我还想从 S3 存储桶中获取上次上传日期,以便我知道上次执行卸载的时间。请建议带有解释的代码。
提前感谢您的时间和努力!
import csv
import redshift_connector
import sys
CSV_FILE="Tables.csv"
CSV_DELIMITER=';'
S3_DEST_PATH="s3://..../"
DB_HOST="MY HOST"
DB_PORT=1234
DB_DB="MYDB"
DB_USER="MY_READ"
DB_PASSWORD="MY_PSWD"
IM_ROLE="arn:aws:iam::/redshift-role/unload data","arn:aws::iam::/write in bucket"
def get_tables(path):
tables=[]
with open (path, 'r') as file:
csv_reader = csv.reader (file,delimiter=CSV_DELIMITER)
header = next(csv_reader)
if header != None:
for row in csv_reader:
tables.append(row)
return tables
def unload(conn, tables, s3_path):
cur = conn.cursor()
for table in tables:
print(f">{table[0]}.{table[1]}")
try:
query= f'''unload('select * from {table[0]}.{table[1]}' to '{s3_path}/{table[1]}/'
iam_role '{IAM_ROLE}'
CSV
PARALLEL FALSE
CLEANPATH;'''
print(f"loading in progress")
cur.execute(query)
print(f"Done.")
except Esception as e:
print("Failed to load")
print(str(e))
sys.exit(1)
cur.close()
def main():
try:
conn = redshift_connector.connect(
host=DB_HOST,
port=DB_PORT,
database= DB_DB,
user= DB_USER,
password=DB_PASSWORD
)
tables = get_tables(CSV_FILE)
unload(conn,tables,S3_DEST_PATH)
conn.close()
except Exception as e:
print(e)
sys.exit(1)
根据 SO 用户的评论更新代码
tables=['schema1.tablename','schema2.table2']
conn=redshift_connector.connect(
host='my_host',
port= "my_port",
database='my_db'
user="user"
password='password')
cur=conn.cursor()
cur.execute ('select count(*) from {',' .join("'"+y+"'" for y in tables)}')
results=cur.fetchall()
print("The table {} contained".format(tables[0]),*result[0],"rows"+"\n" ) #Printing row counts along with table names
cur.close()
conn.close()
第二次更新:
表=['schema1.tablename','schema2.table2']
conn=redshift_connector.connect(
host='my_host',
port= "my_port",
database='my_db'
user="user"
password='password')
cur=conn.cursor()
for table in tables:
cur.execute(f'select count(*) from {table};')
results=cur.fetchone()
for row in result:
print("The table {} contained".format(tables[0]),result[0],"rows"+"\n" ) #Printing row counts along with table names
获取行数的简单查询是
query = "select count(*) from {table_name}"
对于 Redshift,您需要做的就是
cur.execute(query)
row_count = cur.fetchall()
使用 boto3
,您也可以使用类似的 SQL 查询来获取 S3 行数,如
编辑:
稍微更正了您的更新方法:
cur=conn.cursor()
for table in tables:
cur.execute(f'select count(*) from {table};')
result=cur.fetchone()
count = result[0] if result else 0
print(f"The table {table} contained {count} rows.\n" )