将大量数据从一个 Redshift table 卸载到另一个的策略?
Strategies for unloading massive amounts of data from one Redshift table to another?
我的公司每个月都会收集大量关于我们服务器使用情况的数据(大约 100 亿行)。我的任务是将数据从初始 table 卸载到 S3
,然后我将其复制到另一个集群中的 table。然后,此数据用于 Tableau
中的仪表板报告。
我 运行 遇到卸载(以及在某种程度上复制)步骤间歇性失败并出现 Unexpected error: The server is already closed.
等错误的问题,这让我认为它实际上是超时了。还有一个奇怪的行为,它在卸载步骤中搅动并挂起,失败后我可以看到它已将所有数据和清单文件卸载到存储桶中。
由于所有这些不确定性,我不得不寻找其他可能稍微分配任务的策略。我对 Spark
非常感兴趣,目前正在使用 pyspark
了解它,并且想知道我是否可以以某种方式缓解分布式处理的问题。是否可以只将数据存储在 ec2
中并让 Tableau 从那里提取数据?有没有办法分发卸载过程?
我将在下面包含我的流程中的代码,这样如果出现我造成的瓶颈,我可以纠正它:
from datetime import datetime
import logging
import boto3
import psycopg2 as ppg2
from inst_utils import aws, misc_utils
from inst_config import config3
if __name__ == '__main__':
logger = misc_utils.initialize_logger(config3.REQUESTS_USAGE_LOGFILE)
# Unload step
timestamp = datetime.now()
month = timestamp.month
year = timestamp.year
s3_sesh = boto3.session.Session(**config3.S3_INFO)
s3 = s3_sesh.resource('s3')
fname = 'load_{}_{:02d}'.format(year, month)
bucket_url = ('canvas_logs/agg_canvas_logs_user_agent_types/'
'{}/'.format(fname))
unload_url = ('s3://{}/{}'.format(config3.S3_BUCKET, bucket_url))
s3.Bucket(config3.S3_BUCKET).put_object(Key=bucket_url)
table_name = 'requests_{}_{:02d}'.format(year, month - 1)
logger.info('Starting unload.')
try:
with ppg2.connect(**config3.REQUESTS_POSTGRES_INFO) as conn:
cur = conn.cursor()
# TODO add sql the sql folder to clean up this program.
unload = r'''
unload ('select
user_id
,course_id
,request_month
,user_agent_type
,count(session_id)
,\'DEV\' etl_requests_usage
,CONVERT_TIMEZONE(\'MST\', getdate()) etl_datetime_local
,\'agg_canvas_logs_user_agent_types\' etl_transformation_name
,\'N/A\' etl_pdi_version
,\'N/A\' etl_pdi_build_version
,null etl_pdi_hostname
,null etl_pdi_ipaddress
,null etl_checksum_md5
from
(select distinct
user_id
,context_id as course_id
,date_trunc(\'month\', request_timestamp) request_month
,session_id
,case
when user_agent like \'%CanvasAPI%\' then \'api\'
when user_agent like \'%candroid%\' then \'mobile_app_android\'
when user_agent like \'%iCanvas%\' then \'mobile_app_ios\'
when user_agent like \'%CanvasKit%\' then \'mobile_app_ios\'
when user_agent like \'%Windows NT%\' then \'desktop\'
when user_agent like \'%MacBook%\' then \'desktop\'
when user_agent like \'%iPhone%\' then \'mobile\'
when user_agent like \'%iPod Touch%\' then \'mobile\'
when user_agent like \'%iPad%\' then \'mobile\'
when user_agent like \'%iOS%\' then \'mobile\'
when user_agent like \'%CrOS%\' then \'desktop\'
when user_agent like \'%Android%\' then \'mobile\'
when user_agent like \'%Linux%\' then \'desktop\'
when user_agent like \'%Mac OS%\' then \'desktop\'
when user_agent like \'%Macintosh%\' then \'desktop\'
else \'other_unknown\'
end as user_agent_type
from {}
where context_type = \'Course\')
group by
user_id
,course_id
,request_month
,user_agent_type')
to '{}'
credentials 'aws_access_key_id={};aws_secret_access_key={}'
manifest
gzip
delimiter '|'
'''.format(
table_name, unload_url, config3.S3_ACCESS, config3.S3_SECRET)
cur.execute(unload)
conn.commit()
except ppg2.Error as e:
logger.critical('Error occurred during transaction: {}'.format(e))
raise Exception('{}'.format(e))
logger.info('Starting copy process.')
schema_name = 'ods_canvas_logs'
table_name = 'agg_canvas_logs_user_agent_types'
manifest_url = unload_url + 'manifest'
logger.info('Manifest url: {}'.format(manifest_url))
load = aws.RedshiftLoad(schema_name,
table_name,
manifest_url,
config3.S3_INFO,
config3.REDSHIFT_POSTGRES_INFO_PROD,
config3.REDSHIFT_POSTGRES_INFO,
safe_load=True,
truncate=True
)
load.execute()
FWIW,我认为您应该 post 另一个问题,其中包含您尝试过的 UNLOAD
的详细信息。
我发现 UNLOAD
在卸载整个 table 时 工作得更好 ,例如,不使用查询。
尝试使用要卸载的数据子集创建临时文件 table,然后 UNLOAD
整个 table,然后删除临时文件 table。
CREATE TEMP TABLE a AS SELECT b FROM c WHERE d = e;
UNLOAD (SELECT * FROM a) TO 's3://bucket' CREDENTIALS … ;
DROP TABLE a;
关于您上面的实际问题,我认为您采用这种方法不会取得多大成功。瓶颈不会是 Spark 或 Python,只是 Redshift 根本不是 设计的 来返回大量行。
我同意@Jim Nasby 的观点——GROUP BY 和 DISTINCT 是多余的,而且最有可能引起麻烦,因为它们强制 Redshift 在复制之前在单个 Leader 节点上执行整个数据集的整理。
Redshift 的 COPY 命令的巨大好处是,如果查询允许,每个节点都可以与其他节点并行卸载自己的数据。因此,如果您有 10 个节点,则所有 10 个节点都可以创建 S3 连接(多个)并开始输出数据。
在您的情况下,通过使用此 DISTINCT,您实际上禁用了它,因为所有数据都需要首先重新计算。
所以我会和其他人一起说,最好是按原样转储整个 table(会更快,对集群的负担更小),或者根据日期范围进行简单的增量上传,可能有一些其他简单的条件(比如你有 context_type = \'Course\')
。只要没有应该 运行 并行的 GROUP BY/DISTINCT/ORDER BY,并且速度非常快。
使用 Spark 不会有任何区别,它只会先通过 SQL 连接抽取数据。
我的公司每个月都会收集大量关于我们服务器使用情况的数据(大约 100 亿行)。我的任务是将数据从初始 table 卸载到 S3
,然后我将其复制到另一个集群中的 table。然后,此数据用于 Tableau
中的仪表板报告。
我 运行 遇到卸载(以及在某种程度上复制)步骤间歇性失败并出现 Unexpected error: The server is already closed.
等错误的问题,这让我认为它实际上是超时了。还有一个奇怪的行为,它在卸载步骤中搅动并挂起,失败后我可以看到它已将所有数据和清单文件卸载到存储桶中。
由于所有这些不确定性,我不得不寻找其他可能稍微分配任务的策略。我对 Spark
非常感兴趣,目前正在使用 pyspark
了解它,并且想知道我是否可以以某种方式缓解分布式处理的问题。是否可以只将数据存储在 ec2
中并让 Tableau 从那里提取数据?有没有办法分发卸载过程?
我将在下面包含我的流程中的代码,这样如果出现我造成的瓶颈,我可以纠正它:
from datetime import datetime
import logging
import boto3
import psycopg2 as ppg2
from inst_utils import aws, misc_utils
from inst_config import config3
if __name__ == '__main__':
logger = misc_utils.initialize_logger(config3.REQUESTS_USAGE_LOGFILE)
# Unload step
timestamp = datetime.now()
month = timestamp.month
year = timestamp.year
s3_sesh = boto3.session.Session(**config3.S3_INFO)
s3 = s3_sesh.resource('s3')
fname = 'load_{}_{:02d}'.format(year, month)
bucket_url = ('canvas_logs/agg_canvas_logs_user_agent_types/'
'{}/'.format(fname))
unload_url = ('s3://{}/{}'.format(config3.S3_BUCKET, bucket_url))
s3.Bucket(config3.S3_BUCKET).put_object(Key=bucket_url)
table_name = 'requests_{}_{:02d}'.format(year, month - 1)
logger.info('Starting unload.')
try:
with ppg2.connect(**config3.REQUESTS_POSTGRES_INFO) as conn:
cur = conn.cursor()
# TODO add sql the sql folder to clean up this program.
unload = r'''
unload ('select
user_id
,course_id
,request_month
,user_agent_type
,count(session_id)
,\'DEV\' etl_requests_usage
,CONVERT_TIMEZONE(\'MST\', getdate()) etl_datetime_local
,\'agg_canvas_logs_user_agent_types\' etl_transformation_name
,\'N/A\' etl_pdi_version
,\'N/A\' etl_pdi_build_version
,null etl_pdi_hostname
,null etl_pdi_ipaddress
,null etl_checksum_md5
from
(select distinct
user_id
,context_id as course_id
,date_trunc(\'month\', request_timestamp) request_month
,session_id
,case
when user_agent like \'%CanvasAPI%\' then \'api\'
when user_agent like \'%candroid%\' then \'mobile_app_android\'
when user_agent like \'%iCanvas%\' then \'mobile_app_ios\'
when user_agent like \'%CanvasKit%\' then \'mobile_app_ios\'
when user_agent like \'%Windows NT%\' then \'desktop\'
when user_agent like \'%MacBook%\' then \'desktop\'
when user_agent like \'%iPhone%\' then \'mobile\'
when user_agent like \'%iPod Touch%\' then \'mobile\'
when user_agent like \'%iPad%\' then \'mobile\'
when user_agent like \'%iOS%\' then \'mobile\'
when user_agent like \'%CrOS%\' then \'desktop\'
when user_agent like \'%Android%\' then \'mobile\'
when user_agent like \'%Linux%\' then \'desktop\'
when user_agent like \'%Mac OS%\' then \'desktop\'
when user_agent like \'%Macintosh%\' then \'desktop\'
else \'other_unknown\'
end as user_agent_type
from {}
where context_type = \'Course\')
group by
user_id
,course_id
,request_month
,user_agent_type')
to '{}'
credentials 'aws_access_key_id={};aws_secret_access_key={}'
manifest
gzip
delimiter '|'
'''.format(
table_name, unload_url, config3.S3_ACCESS, config3.S3_SECRET)
cur.execute(unload)
conn.commit()
except ppg2.Error as e:
logger.critical('Error occurred during transaction: {}'.format(e))
raise Exception('{}'.format(e))
logger.info('Starting copy process.')
schema_name = 'ods_canvas_logs'
table_name = 'agg_canvas_logs_user_agent_types'
manifest_url = unload_url + 'manifest'
logger.info('Manifest url: {}'.format(manifest_url))
load = aws.RedshiftLoad(schema_name,
table_name,
manifest_url,
config3.S3_INFO,
config3.REDSHIFT_POSTGRES_INFO_PROD,
config3.REDSHIFT_POSTGRES_INFO,
safe_load=True,
truncate=True
)
load.execute()
FWIW,我认为您应该 post 另一个问题,其中包含您尝试过的 UNLOAD
的详细信息。
我发现 UNLOAD
在卸载整个 table 时 工作得更好 ,例如,不使用查询。
尝试使用要卸载的数据子集创建临时文件 table,然后 UNLOAD
整个 table,然后删除临时文件 table。
CREATE TEMP TABLE a AS SELECT b FROM c WHERE d = e;
UNLOAD (SELECT * FROM a) TO 's3://bucket' CREDENTIALS … ;
DROP TABLE a;
关于您上面的实际问题,我认为您采用这种方法不会取得多大成功。瓶颈不会是 Spark 或 Python,只是 Redshift 根本不是 设计的 来返回大量行。
我同意@Jim Nasby 的观点——GROUP BY 和 DISTINCT 是多余的,而且最有可能引起麻烦,因为它们强制 Redshift 在复制之前在单个 Leader 节点上执行整个数据集的整理。
Redshift 的 COPY 命令的巨大好处是,如果查询允许,每个节点都可以与其他节点并行卸载自己的数据。因此,如果您有 10 个节点,则所有 10 个节点都可以创建 S3 连接(多个)并开始输出数据。
在您的情况下,通过使用此 DISTINCT,您实际上禁用了它,因为所有数据都需要首先重新计算。
所以我会和其他人一起说,最好是按原样转储整个 table(会更快,对集群的负担更小),或者根据日期范围进行简单的增量上传,可能有一些其他简单的条件(比如你有 context_type = \'Course\')
。只要没有应该 运行 并行的 GROUP BY/DISTINCT/ORDER BY,并且速度非常快。
使用 Spark 不会有任何区别,它只会先通过 SQL 连接抽取数据。