作业之间的 Apache Spark 延迟
Apache Spark Delay Between Jobs
如您所见,我的小型应用程序有 4 个作业 运行 总持续时间为 20.2 秒,但是作业 1 和作业 2 之间存在很大的延迟,导致总时间超过了分钟。作业编号 1 运行 SparkHadoopMapReduceWriter.scala:88 的作业正在将 HFile 批量上传到 HBase table。这是我用来加载文件的代码
val outputDir = new Path(HBaseUtils.getHFilesStorageLocation(resolvedTableName))
val job = Job.getInstance(hBaseConf)
job.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, resolvedTableName)
job.setOutputFormatClass(classOf[HFileOutputFormat2])
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
val connection = ConnectionFactory.createConnection(job.getConfiguration)
val hBaseAdmin = connection.getAdmin
val table = TableName.valueOf(Bytes.toBytes(resolvedTableName))
val tab = connection.getTable(table).asInstanceOf[HTable]
val bulkLoader = new LoadIncrementalHFiles(job.getConfiguration)
preBulkUploadCallback.map(callback => callback())
bulkLoader.doBulkLoad(outputDir, hBaseAdmin, tab, tab.getRegionLocator)
如果有人有任何想法,我将非常感激
根据创建的 hfile 数,我可以看到作业 1 中有 26 个任务。即使作业 2 显示在 2 秒内完成,将这些文件复制到目标位置也需要一些时间,这就是作业 2 和作业 3 之间出现延迟的原因。这可以通过减少作业 1 中的任务数量来避免。
减少 Hbase 中输出 table 的区域数量,这将减少第二个作业的任务数量。
TableOutputFormat 根据 Hbase
中给定 table 的区域数量确定拆分
Job number 1 runJob at SparkHadoopMapReduceWriter.scala:88 is performing a bulkupload
这不完全正确。此作业仅创建 HBase outside 的 HFile。您看到的这份工作与下一份工作之间的差距可以用 实际批量加载 bulkLoader.doBulkLoad
来解释。此操作仅涉及元数据传输并且通常执行速度更快(根据我的经验),因此您应该检查驱动程序日志以查看它挂起的位置。
感谢你们的意见,我减少了任务 0 中创建的 HFile 的数量。这减少了大约 20% 的延迟。我用了
HFileOutputFormat2.configureIncrementalLoad(job, tab, tab.getRegionLocator)
它会自动计算 reduce 任务的数量以匹配 table 的当前区域数量。我会说我们在 AWS EMR 中使用由 S3 支持的 HBase,而不是传统的 HDFS。我现在要调查这是否会导致延迟。
如您所见,我的小型应用程序有 4 个作业 运行 总持续时间为 20.2 秒,但是作业 1 和作业 2 之间存在很大的延迟,导致总时间超过了分钟。作业编号 1 运行 SparkHadoopMapReduceWriter.scala:88 的作业正在将 HFile 批量上传到 HBase table。这是我用来加载文件的代码
val outputDir = new Path(HBaseUtils.getHFilesStorageLocation(resolvedTableName))
val job = Job.getInstance(hBaseConf)
job.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, resolvedTableName)
job.setOutputFormatClass(classOf[HFileOutputFormat2])
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
val connection = ConnectionFactory.createConnection(job.getConfiguration)
val hBaseAdmin = connection.getAdmin
val table = TableName.valueOf(Bytes.toBytes(resolvedTableName))
val tab = connection.getTable(table).asInstanceOf[HTable]
val bulkLoader = new LoadIncrementalHFiles(job.getConfiguration)
preBulkUploadCallback.map(callback => callback())
bulkLoader.doBulkLoad(outputDir, hBaseAdmin, tab, tab.getRegionLocator)
如果有人有任何想法,我将非常感激
根据创建的 hfile 数,我可以看到作业 1 中有 26 个任务。即使作业 2 显示在 2 秒内完成,将这些文件复制到目标位置也需要一些时间,这就是作业 2 和作业 3 之间出现延迟的原因。这可以通过减少作业 1 中的任务数量来避免。
减少 Hbase 中输出 table 的区域数量,这将减少第二个作业的任务数量。
TableOutputFormat 根据 Hbase
中给定 table 的区域数量确定拆分Job number 1 runJob at SparkHadoopMapReduceWriter.scala:88 is performing a bulkupload
这不完全正确。此作业仅创建 HBase outside 的 HFile。您看到的这份工作与下一份工作之间的差距可以用 实际批量加载 bulkLoader.doBulkLoad
来解释。此操作仅涉及元数据传输并且通常执行速度更快(根据我的经验),因此您应该检查驱动程序日志以查看它挂起的位置。
感谢你们的意见,我减少了任务 0 中创建的 HFile 的数量。这减少了大约 20% 的延迟。我用了
HFileOutputFormat2.configureIncrementalLoad(job, tab, tab.getRegionLocator)
它会自动计算 reduce 任务的数量以匹配 table 的当前区域数量。我会说我们在 AWS EMR 中使用由 S3 支持的 HBase,而不是传统的 HDFS。我现在要调查这是否会导致延迟。