Pyspark - df.cache().count() 永远 运行

Pyspark - df.cache().count() taking forever to run

我正在尝试使用我在网上阅读的计数方法强制对 PySpark 进行热切评估:

spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

spark_df.cache().count()

但是,当我尝试 运行ning 代码时,缓存计数部分永远占用 运行。我的数据量相对较小(2.7GB,1500 万行),但在 运行ning 28 分钟后,我决定结束这项工作。作为对比,当我使用pandas.read_sql()方法读取数据时,只用了6分43秒。

我正在 运行 编写代码的机器非常强大,(20 个 vCPU,160 GB RAM,Windows OS)。我相信我错过了加快计数语句的步骤。

如有任何帮助或建议,我们将不胜感激。

当您使用pandas读取时,它会从机器的可用内存中使用尽可能多的内存(假设您提到的所有160Gb,这远远大于数据本身~3Gb ).

但是,Spark 就不一样了。当您开始您的 Spark 会话时,通常您必须提及 预先 您想要使用的每个执行程序(以及驱动程序和应用程序管理器,如果适用)有多少内存,如果您不不指定它,根据 latest Spark documentation,它将是 1Gb。所以你要做的第一件事就是给你的执行者和驱动程序更多的内存。

其次,Spark 从 JDBC 中读取数据很棘手,因为速度慢与否取决于执行程序(和任务)的数量,而这些数字取决于您的 RDD(从 JDBC 连接)有,分区的数量取决于你的 table、你的查询、列、条件等。强制改变行为的一种方法,有更多的分区,更多的任务,更多的执行者,.. . 是通过这些配置:numPartitionspartitionColumnlowerBoundupperBound.

  • numPartitions 是分区的数量(因此将使用执行器的数量)
  • partitionColumn 是一个整数类型的列,Spark 将使用它来定位分区
  • lowerBound 是您要读取的 partitionColumn 的最小值
  • upperBound 是您要读取的 partitionColumn 的最大值

您可以在此处阅读更多内容 ,但基本思想是,您希望使用 合理数量的执行程序 (由 numPartitions 定义) ,为每个执行程序处理均匀分布的数据块(由partitionColumnlowerBoundupperBound定义)。