将 MySQL table 转换为镶木地板时出现 Spark 异常
Spark Exception when converting a MySQL table to parquet
我正在尝试使用 spark 1.6.2 将 MySQL 远程 table 转换为 parquet 文件。
该进程运行了 10 分钟,填满了内存,然后以这些消息启动:
WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
最后失败并出现此错误:
ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded
我 运行 它在 spark-shell 中使用这些命令:
spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G
val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()
dataframe_mysql.saveAsParquetFile("name.parquet")
我将最大执行程序内存限制为 12G。有没有办法强制在 "small" 块中写入 parquet 文件以释放内存?
问题似乎是您在使用 jdbc 连接器读取数据时没有定义分区。
从 JDBC 中读取默认情况下不分发,因此要启用分发,您必须设置手动分区。您需要一个很好的分区键列,并且您必须预先知道分布。
这显然是您的数据的样子:
root
|-- id: long (nullable = false)
|-- order_year: string (nullable = false)
|-- order_number: string (nullable = false)
|-- row_number: integer (nullable = false)
|-- product_code: string (nullable = false)
|-- name: string (nullable = false)
|-- quantity: integer (nullable = false)
|-- price: double (nullable = false)
|-- price_vat: double (nullable = false)
|-- created_at: timestamp (nullable = true)
|-- updated_at: timestamp (nullable = true)
order_year
对我来说似乎是个不错的候选人。 (根据您的评论,您似乎有 ~20 年)
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = ???
val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???
// Manual partitioning
val partitionColumn: String = "order_year"
val options: Map[String, String] = Map("driver" -> driver,
"url" -> connectionUrl,
"dbtable" -> query,
"user" -> userName,
"password" -> password,
"partitionColumn" -> partitionColumn,
"lowerBound" -> "0",
"upperBound" -> "3000",
"numPartitions" -> "300"
)
val df = sqlContext.read.format("jdbc").options(options).load()
PS: partitionColumn
, lowerBound
, upperBound
, numPartitions
:
如果指定其中任何一个,则必须全部指定这些选项。
现在您可以将 DataFrame
保存到镶木地板上。
我正在尝试使用 spark 1.6.2 将 MySQL 远程 table 转换为 parquet 文件。
该进程运行了 10 分钟,填满了内存,然后以这些消息启动:
WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
最后失败并出现此错误:
ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded
我 运行 它在 spark-shell 中使用这些命令:
spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G
val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()
dataframe_mysql.saveAsParquetFile("name.parquet")
我将最大执行程序内存限制为 12G。有没有办法强制在 "small" 块中写入 parquet 文件以释放内存?
问题似乎是您在使用 jdbc 连接器读取数据时没有定义分区。
从 JDBC 中读取默认情况下不分发,因此要启用分发,您必须设置手动分区。您需要一个很好的分区键列,并且您必须预先知道分布。
这显然是您的数据的样子:
root
|-- id: long (nullable = false)
|-- order_year: string (nullable = false)
|-- order_number: string (nullable = false)
|-- row_number: integer (nullable = false)
|-- product_code: string (nullable = false)
|-- name: string (nullable = false)
|-- quantity: integer (nullable = false)
|-- price: double (nullable = false)
|-- price_vat: double (nullable = false)
|-- created_at: timestamp (nullable = true)
|-- updated_at: timestamp (nullable = true)
order_year
对我来说似乎是个不错的候选人。 (根据您的评论,您似乎有 ~20 年)
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = ???
val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???
// Manual partitioning
val partitionColumn: String = "order_year"
val options: Map[String, String] = Map("driver" -> driver,
"url" -> connectionUrl,
"dbtable" -> query,
"user" -> userName,
"password" -> password,
"partitionColumn" -> partitionColumn,
"lowerBound" -> "0",
"upperBound" -> "3000",
"numPartitions" -> "300"
)
val df = sqlContext.read.format("jdbc").options(options).load()
PS: partitionColumn
, lowerBound
, upperBound
, numPartitions
:
如果指定其中任何一个,则必须全部指定这些选项。
现在您可以将 DataFrame
保存到镶木地板上。