执行一个多小时 pyspark.sql.DataFrame.take(4)
More than one hour to execute pyspark.sql.DataFrame.take(4)
我在 3 个虚拟机(即 1 个主服务器;2 个从属服务器)上 运行ning spark 1.6,均具有 4 个内核和 16GB 内存。
我可以看到在spark-master webUI上注册的worker。
我想从我的 Vertica 数据库中检索数据以对其进行处理。由于我没有设法 运行 复杂的查询,所以我尝试了虚拟查询来理解。我们认为这是一件容易的事。
我的代码是:
df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)
输出是(注意:我用 @IPSLAVE
替换从属 VM IP:Port):
16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s
如您所见,这需要很长时间。
我的 table 实际上相当大(存储大约 2.2 亿行,每行 11 个字段)但是使用 "normal" sql(例如 pyodbc)可以立即执行这样的查询。
我想我是 missunderstanding/missusing Spark,您有什么想法或建议可以让它更好地工作吗?
虽然 Spark 支持对 JDBC 的有限谓词下推,但所有其他操作(如限制、组、聚合)都在内部执行。不幸的是,这意味着 take(4)
将首先获取数据然后应用 limit
。换句话说,您的数据库将执行(假设没有投影和过滤器)等同于:
SELECT * FROM table
其余的将由 Spark 处理。涉及一些优化(特别是 Spark 以获得 LIMIT
请求的记录数),但与数据库端优化相比,它仍然是非常低效的过程。
如果您想将 limit
推送到数据库,您必须使用子查询作为 dbtable
参数静态地执行此操作:
(sqlContext.read.format('jdbc')
.options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
"url" -> "xxxx",
"dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))
请注意子查询中的别名是强制性的。
注:
一旦数据源 API v2 准备就绪,此行为将来可能会得到改进:
我在 3 个虚拟机(即 1 个主服务器;2 个从属服务器)上 运行ning spark 1.6,均具有 4 个内核和 16GB 内存。
我可以看到在spark-master webUI上注册的worker。
我想从我的 Vertica 数据库中检索数据以对其进行处理。由于我没有设法 运行 复杂的查询,所以我尝试了虚拟查询来理解。我们认为这是一件容易的事。
我的代码是:
df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)
输出是(注意:我用 @IPSLAVE
替换从属 VM IP:Port):
16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s
如您所见,这需要很长时间。 我的 table 实际上相当大(存储大约 2.2 亿行,每行 11 个字段)但是使用 "normal" sql(例如 pyodbc)可以立即执行这样的查询。
我想我是 missunderstanding/missusing Spark,您有什么想法或建议可以让它更好地工作吗?
虽然 Spark 支持对 JDBC 的有限谓词下推,但所有其他操作(如限制、组、聚合)都在内部执行。不幸的是,这意味着 take(4)
将首先获取数据然后应用 limit
。换句话说,您的数据库将执行(假设没有投影和过滤器)等同于:
SELECT * FROM table
其余的将由 Spark 处理。涉及一些优化(特别是 Spark LIMIT
请求的记录数),但与数据库端优化相比,它仍然是非常低效的过程。
如果您想将 limit
推送到数据库,您必须使用子查询作为 dbtable
参数静态地执行此操作:
(sqlContext.read.format('jdbc')
.options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
"url" -> "xxxx",
"dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))
请注意子查询中的别名是强制性的。
注:
一旦数据源 API v2 准备就绪,此行为将来可能会得到改进: