Spark/Glue:当 .count() 或在 ~20MM 记录和 1 名工人的数据帧上生成字段列表时的性能问题
Spark/Glue: performance issue when .count() or when generating fields' list on dataframe of ~20MM records and 1 worker
我正在尝试 运行 使用 AWS Glue 的简单 ETL 过程。
过程很简单:使用 JDBC 连接器从数据库中读取 20+ tables,然后将它们放入 S3。一切正常,唯一的问题是 运行 工作所需的时间(2 小时以上)。
主要瓶颈是由一些非常大的 table(16 到 2000 万条记录)造成的,而且我必须提取行数和字段列表。
粘合作业使用 Python 3 个,Spark 3 个,2 个工人(其中 1 个驱动程序)。
我第一次阅读 table:
df = sparkSession.read.format("jdbc").option("url", connection_url).option("dbtable", table).option("driver", DRIVER).load()
然后我将其转换为 GlueDynamicFrame(因为我更容易对其进行 运行 操作):
df = DynamicFrame.fromDF(df, glueContext, "df")
然后我继续计算行数:
n_rows = df.count()
痛苦的开始:对于某些 table(最大的),需要 10 到 20 分钟才能达到 return 这个值。我已经研究并(我认为)理解了 Spark 中惰性求值和计算的概念,但在我看来,这个操作无论如何都应该减少,我肯定做错了什么。无论如何,然后我继续生成一个字段列表:
fields = [df.schema().fields[x].name for x in range(0, len(df.schema().fields))]
再过 10 到 20 分钟 运行。最终,我下沉了数据框:
glueContext.write_dynamic_frame.\
from_options(frame = df,
connection_type = "s3",
connection_options = {"path": path,
"partitionKeys": [partition]},
format = "parquet")
同样,这些大 table 需要很长时间。
值得一提的是,我从数据库中提取的 tables 也包含很少的行。我提到这一点是因为我在阅读 table 后就已经阅读了重新分区的可能解决方案,但是对 3 行的 DataFrame 重新分区毫无意义。
系统地做这件事的唯一方法是先计算行数,然后根据 n_rows 重新分区,但它已经 forever.Also,我读到分区数应该是与工人数量有一定关系。我有 1 个工人,所以 1 个分区对我来说似乎合乎逻辑。
我的问题是:我做错了什么?我是否应该在阅读时增加工人数量并相应地重新分配?或者还有哪些其他解决方案可用?
非常感谢任何建议!
编辑:最终增加工作人员的数量、缓存以及使用 lower_bound/upper_bound 读取时的分区帮助很大。但对我帮助最大的是避免 df.count() 成为瘟疫。如果没有那个操作,工作持续时间会减少 80%...不知道为什么,因为我还是一个初学者,但是这个非常简单的操作不会真的那么昂贵...
我们在迁移过程中遇到了同样的挑战,并根据以下优化方法进行了优化。
优化:01
就像您提到的那样,n_rows = df.count()
是一项成本高昂的操作,请尽量避免在您的代码中使用该过程。
优化:02[生成字段列表]
我们已尝试通过示例记录 1 从源中获取架构。
src_connect_string = {'url':"jdbc:teradata://conntionstring,TMODE=TERA", 'user' : "username", 'password' : "mypassword",'query':"select * from tablename limit 1 ",'driver' :"com.teradata.jdbc.TeraDriver"}
df_td_src=spark.read.format("jdbc").options(**src_connect_string).load()
src_td_columns=df_td_src.schema.names
优化:03
找到读取过程或写入过程花费较长时间的位置。基于此,我们可以使该过程并发运行。例如,由于我们的写作过程花费的时间较长,因此我们以并发方式进行写作过程。 ref .
的示例代码
jdbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= user,password= pw, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",100000) \
.load()
# display(data_df)
from pyspark.sql.functions import *
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql.functions import col
date_range = ['2017-01-28']
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
data_df1 = data_df.withColumn("date1", f.from_unixtime(f.unix_timestamp(data_df.LD_TS), "yyyy-MM-dd"))
display(data_df1)
print(curr_date)
save_df = data_df1.filter(f"date1='{curr_date}'").drop('date1')
save_df.write.parquet(f"s3://location")
jobs = []
results_done = []
total_days = 30
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for curr_date in date_range:
print(f"Starting S3 write for date - {curr_date}")
jobs.append(e.submit(writeS3, curr_date))
# result_done = job.result()
# print(f"Job Completed - {result_done}")
print("Task complete")
在处理 16-20 百万条记录时,我肯定会增加工人的数量。您真的想利用 Spark 的并行处理能力。
另外.count()
是一个将强制Spark执行计划的动作。如果您想继续使用该 DataFrame,您应该可以使用 .cache()
来提高性能。
我相信您没有使用由 numPartitions
选项
控制的并行 JDBC 读取机制
您必须达到最佳 numPartitions
数字
- 根据分配的Executor core,一个Executor core执行一个partition。
- 将在执行器中并行执行的数据分区应完全适合内存以避免溢出。
df = spark.read. \
format("jdbc"). \
option("url", "URL"). \
option("user", "<username>"). \
option("password", "<password>"). \
option("dbtable", "<table>"). \
option("partitionColumn", "partitionColumn"). \
option("lowerBound", "<lowest partition number>"). \
option("upperBound", "<largest partition number>"). \
option("numPartitions", "<number of partitions>"). \
load()
我正在尝试 运行 使用 AWS Glue 的简单 ETL 过程。
过程很简单:使用 JDBC 连接器从数据库中读取 20+ tables,然后将它们放入 S3。一切正常,唯一的问题是 运行 工作所需的时间(2 小时以上)。
主要瓶颈是由一些非常大的 table(16 到 2000 万条记录)造成的,而且我必须提取行数和字段列表。 粘合作业使用 Python 3 个,Spark 3 个,2 个工人(其中 1 个驱动程序)。
我第一次阅读 table:
df = sparkSession.read.format("jdbc").option("url", connection_url).option("dbtable", table).option("driver", DRIVER).load()
然后我将其转换为 GlueDynamicFrame(因为我更容易对其进行 运行 操作):
df = DynamicFrame.fromDF(df, glueContext, "df")
然后我继续计算行数:
n_rows = df.count()
痛苦的开始:对于某些 table(最大的),需要 10 到 20 分钟才能达到 return 这个值。我已经研究并(我认为)理解了 Spark 中惰性求值和计算的概念,但在我看来,这个操作无论如何都应该减少,我肯定做错了什么。无论如何,然后我继续生成一个字段列表:
fields = [df.schema().fields[x].name for x in range(0, len(df.schema().fields))]
再过 10 到 20 分钟 运行。最终,我下沉了数据框:
glueContext.write_dynamic_frame.\
from_options(frame = df,
connection_type = "s3",
connection_options = {"path": path,
"partitionKeys": [partition]},
format = "parquet")
同样,这些大 table 需要很长时间。
值得一提的是,我从数据库中提取的 tables 也包含很少的行。我提到这一点是因为我在阅读 table 后就已经阅读了重新分区的可能解决方案,但是对 3 行的 DataFrame 重新分区毫无意义。
系统地做这件事的唯一方法是先计算行数,然后根据 n_rows 重新分区,但它已经 forever.Also,我读到分区数应该是与工人数量有一定关系。我有 1 个工人,所以 1 个分区对我来说似乎合乎逻辑。
我的问题是:我做错了什么?我是否应该在阅读时增加工人数量并相应地重新分配?或者还有哪些其他解决方案可用? 非常感谢任何建议!
编辑:最终增加工作人员的数量、缓存以及使用 lower_bound/upper_bound 读取时的分区帮助很大。但对我帮助最大的是避免 df.count() 成为瘟疫。如果没有那个操作,工作持续时间会减少 80%...不知道为什么,因为我还是一个初学者,但是这个非常简单的操作不会真的那么昂贵...
我们在迁移过程中遇到了同样的挑战,并根据以下优化方法进行了优化。
优化:01
就像您提到的那样,n_rows = df.count()
是一项成本高昂的操作,请尽量避免在您的代码中使用该过程。
优化:02[生成字段列表]
我们已尝试通过示例记录 1 从源中获取架构。
src_connect_string = {'url':"jdbc:teradata://conntionstring,TMODE=TERA", 'user' : "username", 'password' : "mypassword",'query':"select * from tablename limit 1 ",'driver' :"com.teradata.jdbc.TeraDriver"}
df_td_src=spark.read.format("jdbc").options(**src_connect_string).load()
src_td_columns=df_td_src.schema.names
优化:03
找到读取过程或写入过程花费较长时间的位置。基于此,我们可以使该过程并发运行。例如,由于我们的写作过程花费的时间较长,因此我们以并发方式进行写作过程。 ref .
的示例代码jdbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= user,password= pw, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",100000) \
.load()
# display(data_df)
from pyspark.sql.functions import *
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql.functions import col
date_range = ['2017-01-28']
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
data_df1 = data_df.withColumn("date1", f.from_unixtime(f.unix_timestamp(data_df.LD_TS), "yyyy-MM-dd"))
display(data_df1)
print(curr_date)
save_df = data_df1.filter(f"date1='{curr_date}'").drop('date1')
save_df.write.parquet(f"s3://location")
jobs = []
results_done = []
total_days = 30
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for curr_date in date_range:
print(f"Starting S3 write for date - {curr_date}")
jobs.append(e.submit(writeS3, curr_date))
# result_done = job.result()
# print(f"Job Completed - {result_done}")
print("Task complete")
在处理 16-20 百万条记录时,我肯定会增加工人的数量。您真的想利用 Spark 的并行处理能力。
另外.count()
是一个将强制Spark执行计划的动作。如果您想继续使用该 DataFrame,您应该可以使用 .cache()
来提高性能。
我相信您没有使用由 numPartitions
选项
您必须达到最佳 numPartitions
数字
- 根据分配的Executor core,一个Executor core执行一个partition。
- 将在执行器中并行执行的数据分区应完全适合内存以避免溢出。
df = spark.read. \ format("jdbc"). \ option("url", "URL"). \ option("user", "<username>"). \ option("password", "<password>"). \ option("dbtable", "<table>"). \ option("partitionColumn", "partitionColumn"). \ option("lowerBound", "<lowest partition number>"). \ option("upperBound", "<largest partition number>"). \ option("numPartitions", "<number of partitions>"). \ load()