运行 在 apache Hadoop 上进行具有本地性质的 Spark 查询时的数据局部性问题
Problem with data locality when running Spark query with local nature on apache Hadoop
我有一个 Hadoop 集群,它使用 Apache Spark 查询保存在 Hadoop 上的 parquet 文件。例如,当我使用以下 PySpark 代码在 parquet 文件中查找单词时:
df = spark.read.parquet("hdfs://test/parquets/*")
df.filter(df['word'] == "jhon").show()
在 运行 执行此代码后,我转到 spark 应用程序 UI,阶段选项卡。我看到在 Any.
上设置了地区级别的总结,相比之下,由于此查询的性质,它必须在本地 运行 并且至少在 NODE_LOCAL
地区级别上。当我在 运行ning 时检查集群的网络 IO 时,我发现此查询使用网络(网络 IO 在查询 运行ning 时增加)。这种情况的 st运行ge 部分是 spark UI 的 shuffle 部分中显示的数字是最小的。
在 Apache Spark 邮件列表中的 Russell Spitzer 的帮助下确定此问题的根本原因,我 运行 使用以下代码来查找每个分区的首选位置。这段代码的结果让我离解决这个问题又近了一步。我发现首选位置是 IP 形式而不是主机名,但 spark 使用执行程序的 IP 来匹配首选位置并实现数据局部性。
scala> def getRootRdd( rdd:RDD[_] ): RDD[_] = { if(rdd.dependencies.size == 0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]
scala> val rdd = spark.read.parquet("hdfs://test/parquets/*").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at <console>:24
scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at <console>:24
scala> scan.partitions.map(scan.preferredLocations)
res12: Array[Seq[String]] = Array(WrappedArray(datanode-1, datanode-2, datanode-3), WrappedArray(datanode-2, datanode-3, datanode-4), WrappedArray(datanode-3, datanode-4, datanode-5),...
现在我想办法让 Spark 首先解析主机名,然后将它们与执行程序的 IP 进行匹配。有什么建议吗?
造成此问题的原因是 Spark 的首选位置
来自 Hadoop 的分区是数据节点主机名,
但是 Spark worker 通过 IP 注册到 Spark master。
Spark 正在尝试将任务草拟到执行程序上的 运行
与本地分区。因为执行者被映射到 IP
和分区到主机名,调度程序无法将 IP 与主机名匹配,
任务总是 运行 在“任何”地区级别。
为了解决这个问题,我们必须 运行 带有 -h [hostname]
标志的 spark-workers。
结果worker在master上注册的是主机名而不是IP,问题解决
我有一个 Hadoop 集群,它使用 Apache Spark 查询保存在 Hadoop 上的 parquet 文件。例如,当我使用以下 PySpark 代码在 parquet 文件中查找单词时:
df = spark.read.parquet("hdfs://test/parquets/*")
df.filter(df['word'] == "jhon").show()
在 运行 执行此代码后,我转到 spark 应用程序 UI,阶段选项卡。我看到在 Any.
上设置了地区级别的总结,相比之下,由于此查询的性质,它必须在本地 运行 并且至少在 NODE_LOCAL
地区级别上。当我在 运行ning 时检查集群的网络 IO 时,我发现此查询使用网络(网络 IO 在查询 运行ning 时增加)。这种情况的 st运行ge 部分是 spark UI 的 shuffle 部分中显示的数字是最小的。
在 Apache Spark 邮件列表中的 Russell Spitzer 的帮助下确定此问题的根本原因,我 运行 使用以下代码来查找每个分区的首选位置。这段代码的结果让我离解决这个问题又近了一步。我发现首选位置是 IP 形式而不是主机名,但 spark 使用执行程序的 IP 来匹配首选位置并实现数据局部性。
scala> def getRootRdd( rdd:RDD[_] ): RDD[_] = { if(rdd.dependencies.size == 0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]
scala> val rdd = spark.read.parquet("hdfs://test/parquets/*").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at <console>:24
scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at <console>:24
scala> scan.partitions.map(scan.preferredLocations)
res12: Array[Seq[String]] = Array(WrappedArray(datanode-1, datanode-2, datanode-3), WrappedArray(datanode-2, datanode-3, datanode-4), WrappedArray(datanode-3, datanode-4, datanode-5),...
现在我想办法让 Spark 首先解析主机名,然后将它们与执行程序的 IP 进行匹配。有什么建议吗?
造成此问题的原因是 Spark 的首选位置
来自 Hadoop 的分区是数据节点主机名,
但是 Spark worker 通过 IP 注册到 Spark master。
Spark 正在尝试将任务草拟到执行程序上的 运行
与本地分区。因为执行者被映射到 IP
和分区到主机名,调度程序无法将 IP 与主机名匹配,
任务总是 运行 在“任何”地区级别。
为了解决这个问题,我们必须 运行 带有 -h [hostname]
标志的 spark-workers。
结果worker在master上注册的是主机名而不是IP,问题解决