Spark/Glue:当 .count() 或在 ~20MM 记录和 1 名工人的数据帧上生成字段列表时的性能问题

Spark/Glue: performance issue when .count() or when generating fields' list on dataframe of ~20MM records and 1 worker

我正在尝试 运行 使用 AWS Glue 的简单 ETL 过程。

过程很简单:使用 JDBC 连接器从数据库中读取 20+ tables,然后将它们放入 S3。一切正常,唯一的问题是 运行 工作所需的时间(2 小时以上)。

主要瓶颈是由一些非常大的 table(16 到 2000 万条记录)造成的,而且我必须提取行数和字段列表。 粘合作业使用 Python 3 个,Spark 3 个,2 个工人(其中 1 个驱动程序)。

我第一次阅读 table:

df = sparkSession.read.format("jdbc").option("url", connection_url).option("dbtable", table).option("driver", DRIVER).load()

然后我将其转换为 GlueDynamicFrame(因为我更容易对其进行 运行 操作):

df = DynamicFrame.fromDF(df, glueContext, "df")

然后我继续计算行数:

n_rows = df.count()

痛苦的开始:对于某些 table(最大的),需要 10 到 20 分钟才能达到 return 这个值。我已经研究并(我认为)理解了 Spark 中惰性求值和计算的概念,但在我看来,这个操作无论如何都应该减少,我肯定做错了什么。无论如何,然后我继续生成一个字段列表:

fields = [df.schema().fields[x].name for x in range(0, len(df.schema().fields))]

再过 10 到 20 分钟 运行。最终,我下沉了数据框:

glueContext.write_dynamic_frame.\
            from_options(frame = df,
                        connection_type = "s3",
                        connection_options = {"path": path,
                                              "partitionKeys": [partition]},
                        format = "parquet")

同样,这些大 table 需要很长时间。

值得一提的是,我从数据库中提取的 tables 也包含很少的行。我提到这一点是因为我在阅读 table 后就已经阅读了重新分区的可能解决方案,但是对 3 行的 DataFrame 重新分区毫无意义。

系统地做这件事的唯一方法是先计算行数,然后根据 n_rows 重新分区,但它已经 forever.Also,我读到分区数应该是与工人数量有一定关系。我有 1 个工人,所以 1 个分区对我来说似乎合乎逻辑。

我的问题是:我做错了什么?我是否应该在阅读时增加工人数量并相应地重新分配?或者还有哪些其他解决方案可用? 非常感谢任何建议!

编辑:最终增加工作人员的数量、缓存以及使用 lower_bound/upper_bound 读取时的分区帮助很大。但对我帮助最大的是避免 df.count() 成为瘟疫。如果没有那个操作,工作持续时间会减少 80%...不知道为什么,因为我还是一个初学者,但是这个非常简单的操作不会真的那么昂贵...

我们在迁移过程中遇到了同样的挑战,并根据以下优化方法进行了优化。

优化:01

就像您提到的那样,n_rows = df.count() 是一项成本高昂的操作,请尽量避免在您的代码中使用该过程。

优化:02[生成字段列表]

我们已尝试通过示例记录 1 从源中获取架构。

src_connect_string = {'url':"jdbc:teradata://conntionstring,TMODE=TERA", 'user' : "username", 'password' : "mypassword",'query':"select  * from tablename limit 1 ",'driver' :"com.teradata.jdbc.TeraDriver"}
df_td_src=spark.read.format("jdbc").options(**src_connect_string).load()
src_td_columns=df_td_src.schema.names

优化:03

找到读取过程或写入过程花费较长时间的位置。基于此,我们可以使该过程并发运行。例如,由于我们的写作过程花费的时间较长,因此我们以并发方式进行写作过程。 ref .

的示例代码
jdbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")

data_df = spark.read \
          .format('jdbc') \
          .options(url= jdbcurl, user= user,password= pw, query=query, driver= driver,numPartitions=100) \
          .option('customSchema', schema[1:-1]) \
          .option('ConnectionRetries', '3') \
          .option('ConnectionRetryInterval', '2000') \
          .option("fetchSize",100000) \
          .load()

# display(data_df)
from pyspark.sql.functions import *
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql.functions import col
date_range = ['2017-01-28']


def writeS3(curr_date):
    print(f"Starting S3 write for date - {curr_date}")
    data_df1 = data_df.withColumn("date1", f.from_unixtime(f.unix_timestamp(data_df.LD_TS), "yyyy-MM-dd"))
    display(data_df1)
    print(curr_date)
    save_df = data_df1.filter(f"date1='{curr_date}'").drop('date1')
    save_df.write.parquet(f"s3://location")

jobs = []
results_done = []

total_days = 30

with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
  print(f"{raw_bucket}/{db}/{table}/")
  for curr_date in date_range:
      print(f"Starting S3 write for date - {curr_date}")
      jobs.append(e.submit(writeS3, curr_date))
#       result_done = job.result()
#       print(f"Job Completed - {result_done}")
print("Task complete")

在处理 16-20 百万条记录时,我肯定会增加工人的数量。您真的想利用 Spark 的并行处理能力。

另外.count()是一个将强制Spark执行计划的动作。如果您想继续使用该 DataFrame,您应该可以使用 .cache() 来提高性能。

我相信您没有使用由 numPartitions 选项

控制的并行 JDBC 读取机制

您必须达到最佳 numPartitions 数字

  1. 根据分配的Executor core,一个Executor core执行一个partition。
  2. 将在执行器中并行执行的数据分区应完全适合内存以避免溢出。
df = spark.read. \
format("jdbc"). \
option("url", "URL"). \
option("user", "<username>"). \
option("password", "<password>"). \
option("dbtable", "<table>"). \
option("partitionColumn", "partitionColumn"). \
option("lowerBound", "<lowest partition number>"). \
option("upperBound", "<largest partition number>"). \
option("numPartitions", "<number of partitions>"). \
load()