Spark通过JDBC读取RDBMS时是否有参数分区?
Is there any parameter partitioning when Spark reads RDBMS through JDBC?
当我运行spark应用程序进行table同步时,报错信息如下:
19/10/16 01:37:40 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 51)
com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:590)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:57)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1606)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:633)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory.apply(JdbcUtils.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我认为这是table中数据量大造成的。我之前使用了mongo分区相关的参数,比如:spark.mongodb.input.partitioner
,spark.mongodb.input.partitionerOptions.partitionSizeMB
我想知道 Spark 在通过 JDBC?
读取 RDBMS 时是否有类似的分区参数
下面是参数及其描述,我们可以在使用 spark jdbc.
读取 RDBMS table 时使用这些参数
partitionColumn, lowerBound, upperBound - 如果指定其中任何一个,则必须全部指定这些选项。此外,必须指定 numPartitions。他们描述了如何在从多个 worker 并行读取时对 table 进行分区。 partitionColumn 必须是相关 table 中的数字、日期或时间戳列。请注意,lowerBound 和 upperBound 仅用于决定分区步幅,而不用于过滤 table 中的行。因此 table 中的所有行都将被分区并返回。此选项仅适用于阅读。
numPartitions-table读写最大可用于并行的分区数。这也决定了最大并发数 JDBC 连接。如果要写入的分区数超过此限制,我们会在写入前调用 coalesce(numPartitions) 将其减少到此限制。
fetchsize - JDBC 获取大小,它决定每次往返获取多少行。这有助于提高 JDBC 驱动程序的性能,这些驱动程序默认为低提取大小(例如,具有 10 行的 Oracle)。此选项仅适用于阅读。
请注意以上所有参数应该一起使用。下面是一个例子:-
spark.read.format("jdbc").
option("driver", driver).
option("url",url ).
option("partitionColumn",column name).
option("lowerBound", 10).
option("upperBound", 10000).
option("numPartitions", 10).
option("fetchsize",1000).
option("dbtable", query).
option("user", user).
option("password",password).load()
当我运行spark应用程序进行table同步时,报错信息如下:
19/10/16 01:37:40 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 51)
com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:590)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:57)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1606)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:633)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory.apply(JdbcUtils.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我认为这是table中数据量大造成的。我之前使用了mongo分区相关的参数,比如:spark.mongodb.input.partitioner
,spark.mongodb.input.partitionerOptions.partitionSizeMB
我想知道 Spark 在通过 JDBC?
下面是参数及其描述,我们可以在使用 spark jdbc.
读取 RDBMS table 时使用这些参数partitionColumn, lowerBound, upperBound - 如果指定其中任何一个,则必须全部指定这些选项。此外,必须指定 numPartitions。他们描述了如何在从多个 worker 并行读取时对 table 进行分区。 partitionColumn 必须是相关 table 中的数字、日期或时间戳列。请注意,lowerBound 和 upperBound 仅用于决定分区步幅,而不用于过滤 table 中的行。因此 table 中的所有行都将被分区并返回。此选项仅适用于阅读。
numPartitions-table读写最大可用于并行的分区数。这也决定了最大并发数 JDBC 连接。如果要写入的分区数超过此限制,我们会在写入前调用 coalesce(numPartitions) 将其减少到此限制。
fetchsize - JDBC 获取大小,它决定每次往返获取多少行。这有助于提高 JDBC 驱动程序的性能,这些驱动程序默认为低提取大小(例如,具有 10 行的 Oracle)。此选项仅适用于阅读。
请注意以上所有参数应该一起使用。下面是一个例子:-
spark.read.format("jdbc").
option("driver", driver).
option("url",url ).
option("partitionColumn",column name).
option("lowerBound", 10).
option("upperBound", 10000).
option("numPartitions", 10).
option("fetchsize",1000).
option("dbtable", query).
option("user", user).
option("password",password).load()