Apache Spark-SQL 与 Sqoop 基准测试,同时将数据从 RDBMS 传输到 hdfs
Apache Spark-SQL vs Sqoop benchmarking while transferring data from RDBMS to hdfs
我正在处理一个必须将数据从 RDBMS 传输到 HDFS 的用例。我们已经使用 sqoop 完成了这个案例的基准测试,发现我们能够在 6-7 分钟内传输大约 20GB 的数据。
当我尝试使用 Spark SQL 时,性能非常低(1 Gb 的记录从 netezza 传输到 hdfs 需要 4 分钟)。我正在尝试进行一些调整并提高其性能,但不太可能将其调整到 sqoop 的水平(1 分钟内大约 3 Gb 的数据)。
我同意 spark 主要是一个处理引擎这一事实,但我的主要问题是 spark 和 sqoop 都在内部使用 JDBC 驱动程序,所以为什么性能差异如此之大(或者可能是我错过了一些东西)。我正在 post 编写我的代码。
object helloWorld {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local")
val sc= new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
val df2 =sqlContext.sql("select * from POC")
val partitioner= new org.apache.spark.HashPartitioner(14)
val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values
rdd.saveAsTextFile("hdfs://Hostname/test")
}
}
我已经检查了许多其他 post 但无法得到关于 sqoop 的内部工作和调整的明确答案,也没有得到 sqoop vs spark sql 基准测试。请帮助理解这个问题。
您可以尝试以下方法:-
从 netezza 读取数据,没有任何分区,并且 fetch_size 增加到一百万。
sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC")
在将数据写入最终文件之前对其进行重新分区。
val df3 = df2.repartition(10) //to reduce the shuffle
ORC 格式比 TEXT 格式更优化。将最终输出写入 parquet/ORC.
df3.write.format("ORC").save("hdfs://Hostname/test")
您在工作中使用了错误的工具。
Sqoop 将启动一系列进程(在数据节点上),每个进程都将连接到您的数据库(参见 num-mapper),并且每个进程都将提取数据集的一部分。我认为您无法使用 Spark 实现某种读取并行性。
用Sqoop获取数据集,然后用Spark处理。
@amitabh
虽然标记为答案,但我不同意。
一旦您在从 jdbc 读取数据时给出谓词对数据进行分区,spark 将 运行 为每个分区分离任务。在你的情况下,没有任务应该是 14(你可以使用 spark UI 来确认)。
我注意到您正在使用 local 作为 master,这只会为执行程序提供 1 个核心。因此不会有并行性。这就是你的情况。
现在要获得与 sqoop 相同的吞吐量,您需要确保这些任务 运行 是并行的。从理论上讲,这可以通过以下方式之一完成:
1.使用14个执行器,每个执行器1个核心
2. 使用 1 个 14 核执行器(频谱的另一端)
通常,我会为每个执行程序配备 4-5 个核心。因此,我使用 15/5= 3 个执行程序测试性能(我将 1 添加到 14 以考虑在集群模式下为驱动程序 运行ning 提供 1 个核心)。
使用:executor.cores、executor.instances in sparkConf.set 玩配置。
如果这没有显着提高性能,接下来就是查看执行程序内存。
最后,我会调整应用程序逻辑以查看 mapRDD 大小、分区大小和随机播放大小。
以下解决方案对我有帮助
var df=spark.read.format("jdbc").option("url","
"url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load()
df.registerTempTable("tempTable")
var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly
dfRepart.write.format("parquet").save("hdfs_location")
我遇到了同样的问题,因为您正在使用的代码片段不适用于分区。
sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
您可以通过
检查在您的 spark 作业中创建的分区数
df.rdd.partitions.length
您可以使用以下代码连接数据库:
sqlContext.read.jdbc(url=db_url,
table=tableName,
columnName="ID",
lowerBound=1L,
upperBound=100000L,
numPartitions=numPartitions,
connectionProperties=connectionProperties)
要优化您的 Spark 作业,请使用以下参数:
1. 分区数
2.--num-executors
3.--executor-cores
4.--executor-memory
5.--driver-memory
6.fetch-size
2、3、4 和 5 选项取决于您的集群配置
您可以在 spark ui 上监控您的 spark 作业。
Sqoop 和 Spark SQL 都使用 JDBC 连接从 RDBMS 引擎获取数据,但 Sqoop 在这方面有优势,因为它专门用于在 RDBMS 和 HDFS 之间迁移数据。
Sqoop 中可用的每个选项都经过微调,以便在进行数据摄取时获得最佳性能。
您可以从讨论控制映射器数量的选项-m 开始。
这是从 RDBMS 并行获取数据所需要做的。我可以在 Spark SQL 中完成吗?
当然可以,但是开发人员需要照顾 "multithreading" Sqoop 已经自动照顾。
我正在处理一个必须将数据从 RDBMS 传输到 HDFS 的用例。我们已经使用 sqoop 完成了这个案例的基准测试,发现我们能够在 6-7 分钟内传输大约 20GB 的数据。
当我尝试使用 Spark SQL 时,性能非常低(1 Gb 的记录从 netezza 传输到 hdfs 需要 4 分钟)。我正在尝试进行一些调整并提高其性能,但不太可能将其调整到 sqoop 的水平(1 分钟内大约 3 Gb 的数据)。
我同意 spark 主要是一个处理引擎这一事实,但我的主要问题是 spark 和 sqoop 都在内部使用 JDBC 驱动程序,所以为什么性能差异如此之大(或者可能是我错过了一些东西)。我正在 post 编写我的代码。
object helloWorld {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local")
val sc= new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
val df2 =sqlContext.sql("select * from POC")
val partitioner= new org.apache.spark.HashPartitioner(14)
val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values
rdd.saveAsTextFile("hdfs://Hostname/test")
}
}
我已经检查了许多其他 post 但无法得到关于 sqoop 的内部工作和调整的明确答案,也没有得到 sqoop vs spark sql 基准测试。请帮助理解这个问题。
您可以尝试以下方法:-
从 netezza 读取数据,没有任何分区,并且 fetch_size 增加到一百万。
sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC")
在将数据写入最终文件之前对其进行重新分区。
val df3 = df2.repartition(10) //to reduce the shuffle
ORC 格式比 TEXT 格式更优化。将最终输出写入 parquet/ORC.
df3.write.format("ORC").save("hdfs://Hostname/test")
您在工作中使用了错误的工具。
Sqoop 将启动一系列进程(在数据节点上),每个进程都将连接到您的数据库(参见 num-mapper),并且每个进程都将提取数据集的一部分。我认为您无法使用 Spark 实现某种读取并行性。
用Sqoop获取数据集,然后用Spark处理。
@amitabh 虽然标记为答案,但我不同意。
一旦您在从 jdbc 读取数据时给出谓词对数据进行分区,spark 将 运行 为每个分区分离任务。在你的情况下,没有任务应该是 14(你可以使用 spark UI 来确认)。
我注意到您正在使用 local 作为 master,这只会为执行程序提供 1 个核心。因此不会有并行性。这就是你的情况。
现在要获得与 sqoop 相同的吞吐量,您需要确保这些任务 运行 是并行的。从理论上讲,这可以通过以下方式之一完成: 1.使用14个执行器,每个执行器1个核心 2. 使用 1 个 14 核执行器(频谱的另一端)
通常,我会为每个执行程序配备 4-5 个核心。因此,我使用 15/5= 3 个执行程序测试性能(我将 1 添加到 14 以考虑在集群模式下为驱动程序 运行ning 提供 1 个核心)。 使用:executor.cores、executor.instances in sparkConf.set 玩配置。
如果这没有显着提高性能,接下来就是查看执行程序内存。
最后,我会调整应用程序逻辑以查看 mapRDD 大小、分区大小和随机播放大小。
以下解决方案对我有帮助
var df=spark.read.format("jdbc").option("url","
"url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load()
df.registerTempTable("tempTable")
var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly
dfRepart.write.format("parquet").save("hdfs_location")
我遇到了同样的问题,因为您正在使用的代码片段不适用于分区。
sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC")
您可以通过
检查在您的 spark 作业中创建的分区数df.rdd.partitions.length
您可以使用以下代码连接数据库:
sqlContext.read.jdbc(url=db_url,
table=tableName,
columnName="ID",
lowerBound=1L,
upperBound=100000L,
numPartitions=numPartitions,
connectionProperties=connectionProperties)
要优化您的 Spark 作业,请使用以下参数: 1. 分区数 2.--num-executors 3.--executor-cores 4.--executor-memory 5.--driver-memory 6.fetch-size
2、3、4 和 5 选项取决于您的集群配置 您可以在 spark ui 上监控您的 spark 作业。
Sqoop 和 Spark SQL 都使用 JDBC 连接从 RDBMS 引擎获取数据,但 Sqoop 在这方面有优势,因为它专门用于在 RDBMS 和 HDFS 之间迁移数据。
Sqoop 中可用的每个选项都经过微调,以便在进行数据摄取时获得最佳性能。
您可以从讨论控制映射器数量的选项-m 开始。
这是从 RDBMS 并行获取数据所需要做的。我可以在 Spark SQL 中完成吗? 当然可以,但是开发人员需要照顾 "multithreading" Sqoop 已经自动照顾。