sqoop import-all-tables slow and sequence files are custom java 对象
sqoop import-all-tables slow and sequence files are custom java objects
我正在努力将一个非常大的数据库同步到 hive。
有 2 个问题:(1) 文本导入速度较慢,并且存在较慢的大型 mapreduce 步骤。 (2) 序列文件速度更快,但无法通过正常方式读取。
详情如下:
(1) 如果我们将数据导入为文本,则速度较慢。文件累积在临时文件夹中的主目录中,但最终创建的 mapreduce 作业相当慢。
17/04/25 04:18:34 INFO mapreduce.Job: Job job_1490822567992_0996 running in uber mode : false
17/04/25 04:18:34 INFO mapreduce.Job: map 0% reduce 0%
17/04/25 11:05:59 INFO mapreduce.Job: map 29% reduce 0%
17/04/25 11:20:18 INFO mapreduce.Job: map 86% reduce 0% <-- tends to hang a very long time here
(为简洁起见删除了很多行。)
(2) 如果我们将文件导入为序列文件,速度会快得多,但是 Hive 无法读取检索到的数据,因为它需要了解自动生成的 Java 文件。这也有一个 mapreduce 步骤,但它似乎进行得更快(或者也许那是一天中的某个时间……)。
对于每个 table,我们都有一系列这样的 classes,由 sqoop 生成:
public class MyTableName 扩展 SqoopRecord 实现 DBWritable, Writable
使用这些 classes 的步骤是什么?我们如何在配置单元中安装它们?令人惊讶的是,Cloudera 支持工程师并不知道,因为这一定是不常被标示的区域??
sqoop import-all-tables --connect '...' --relaxed-isolation --num-mappers 7 --compress --autoreset-to-one-mapper --compression-codec=snappy --outdir javadir --as-sequencefile --hive-delims-replacement ' '
有什么建议吗?
I am open to Spark. Do you have some sample code?
免责声明:我只是从多个笔记本中收集了一些片段,并且懒得(而且很饿)在离开办公室之前启动测试-运行。任何错误和拼写错误都由您来查找。
使用 Cloudera parcel 提供的 Spark 2.0 (支持 Hive),一个交互风格的 Scala 脚本,在本地模式下,没有任何数据分区,一个Microsoft SQL 服务器连接,并直接插入现有的 Hive 托管 table(带有一些额外的业务逻辑)...
spark2-shell --master local --driver-class-path /some/path/to/sqljdbc42.jar
// 旁注:自动注册类型 4 JDBC 驱动程序在多个 Spark 构建中被破坏,并且错误不断出现,因此指定驱动程序更安全 class 以防万一...
val weather = spark.read.format("jdbc").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", "jdbc:sqlserver://myhost\SQLExpress:9433;database=mydb").option("user", "mylogin").option("password", "*****").option("dbtable", "weather_obs").load()
{ printf( "%%% Partitions: %d / Records: %d\n", weather.rdd.getNumPartitions, weather.count)
println("%%% Detailed DF schema:")
weather.printSchema
}
// "dbtable"
使用子查询的替代方法:
// "(SELECT station, dt_obs_utc, temp_k FROM observation_meteo WHERE station LIKE '78%') x")
weather.registerTempTable("wth")
spark.sql(
"""
INSERT INTO TABLE somedb.sometable
SELECT station, dt_obs_utc, CAST(temp_k -273.15 AS DECIMAL(3,1)) as temp_c
FROM wth
WHERE temp_k IS NOT NULL
""")
dropTempTable("wth")
weather.unpersist()
现在,如果您想在使用 GZip 压缩的 Parquet 文件上动态创建 Hive 外部 table,请将 "temp table" 技巧替换为...
weather.write.option("compression","gzip").mode("overwrite").parquet("hdfs:///some/directory/")
// Parquet 支持的压缩编解码器:none、snappy(默认)、gzip
// 支持的 CSV 压缩编解码器:none(默认)、snappy、lz4、gzip、bzip2
def toImpalaType(sparkType : String ) : String = {
if (sparkType == "StringType" || sparkType == "BinaryType") { return "string" }
if (sparkType == "BooleanType") { return "boolean" }
if (sparkType == "ByteType") { return "tinyint" }
if (sparkType == "ShortType") { return "smallint" }
if (sparkType == "IntegerType") { return "int" }
if (sparkType == "LongType") { return "bigint" }
if (sparkType == "FloatType") { return "float" }
if (sparkType == "DoubleType") { return "double" }
if (sparkType.startsWith("DecimalType")) { return sparkType.replace("DecimalType","decimal") }
if (sparkType == "TimestampType" || sparkType == "DateType") { return "timestamp" }
println("########## ERROR - \"" +sparkType +"\" not supported (bug)")
return "string"
}
spark.sql("DROP TABLE IF EXISTS somedb.sometable")
{ val query = new StringBuilder
query.append("CREATE EXTERNAL TABLE somedb.sometable")
val weatherSchema =weather.dtypes
val (colName0,colType0) = weatherSchema(0)
query.append("\n ( " +colName0 + " " +toImpalaType(colType0))
for ( i <- 2 to tempSchema.length) { val (colName_,colType_) = tempSchema(i-1) ; query.append("\n , " +colName_ + " " +toImpalaType(colType_)) }
query.append("\n )\nCOMMENT 'Imported from SQL Server by Spark'")
query.append("\nSTORED AS Parquet")
query.append("\nLOCATION 'hdfs:///some/directory'")
sqlContext.sql(query.toString())
query.clear()
}
如果您想对输入进行分区 table(基于数字列 - date/time 不支持 AFAIK)然后查看 JDBC 导入选项 partitionColumn
、lowerBound
和upperBound
.
如果你想在 YARN-client 模式下并行加载这些分区,那么添加一个 --jars
参数来将 JDBC 驱动程序上传到执行器。
我正在努力将一个非常大的数据库同步到 hive。
有 2 个问题:(1) 文本导入速度较慢,并且存在较慢的大型 mapreduce 步骤。 (2) 序列文件速度更快,但无法通过正常方式读取。
详情如下:
(1) 如果我们将数据导入为文本,则速度较慢。文件累积在临时文件夹中的主目录中,但最终创建的 mapreduce 作业相当慢。
17/04/25 04:18:34 INFO mapreduce.Job: Job job_1490822567992_0996 running in uber mode : false
17/04/25 04:18:34 INFO mapreduce.Job: map 0% reduce 0%
17/04/25 11:05:59 INFO mapreduce.Job: map 29% reduce 0%
17/04/25 11:20:18 INFO mapreduce.Job: map 86% reduce 0% <-- tends to hang a very long time here
(为简洁起见删除了很多行。)
(2) 如果我们将文件导入为序列文件,速度会快得多,但是 Hive 无法读取检索到的数据,因为它需要了解自动生成的 Java 文件。这也有一个 mapreduce 步骤,但它似乎进行得更快(或者也许那是一天中的某个时间……)。
对于每个 table,我们都有一系列这样的 classes,由 sqoop 生成: public class MyTableName 扩展 SqoopRecord 实现 DBWritable, Writable
使用这些 classes 的步骤是什么?我们如何在配置单元中安装它们?令人惊讶的是,Cloudera 支持工程师并不知道,因为这一定是不常被标示的区域??
sqoop import-all-tables --connect '...' --relaxed-isolation --num-mappers 7 --compress --autoreset-to-one-mapper --compression-codec=snappy --outdir javadir --as-sequencefile --hive-delims-replacement ' '
有什么建议吗?
I am open to Spark. Do you have some sample code?
免责声明:我只是从多个笔记本中收集了一些片段,并且懒得(而且很饿)在离开办公室之前启动测试-运行。任何错误和拼写错误都由您来查找。
使用 Cloudera parcel 提供的 Spark 2.0 (支持 Hive),一个交互风格的 Scala 脚本,在本地模式下,没有任何数据分区,一个Microsoft SQL 服务器连接,并直接插入现有的 Hive 托管 table(带有一些额外的业务逻辑)...
spark2-shell --master local --driver-class-path /some/path/to/sqljdbc42.jar
// 旁注:自动注册类型 4 JDBC 驱动程序在多个 Spark 构建中被破坏,并且错误不断出现,因此指定驱动程序更安全 class 以防万一...
val weather = spark.read.format("jdbc").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", "jdbc:sqlserver://myhost\SQLExpress:9433;database=mydb").option("user", "mylogin").option("password", "*****").option("dbtable", "weather_obs").load()
{ printf( "%%% Partitions: %d / Records: %d\n", weather.rdd.getNumPartitions, weather.count)
println("%%% Detailed DF schema:")
weather.printSchema
}
// "dbtable"
使用子查询的替代方法:
// "(SELECT station, dt_obs_utc, temp_k FROM observation_meteo WHERE station LIKE '78%') x")
weather.registerTempTable("wth")
spark.sql(
"""
INSERT INTO TABLE somedb.sometable
SELECT station, dt_obs_utc, CAST(temp_k -273.15 AS DECIMAL(3,1)) as temp_c
FROM wth
WHERE temp_k IS NOT NULL
""")
dropTempTable("wth")
weather.unpersist()
现在,如果您想在使用 GZip 压缩的 Parquet 文件上动态创建 Hive 外部 table,请将 "temp table" 技巧替换为...
weather.write.option("compression","gzip").mode("overwrite").parquet("hdfs:///some/directory/")
// Parquet 支持的压缩编解码器:none、snappy(默认)、gzip
// 支持的 CSV 压缩编解码器:none(默认)、snappy、lz4、gzip、bzip2
def toImpalaType(sparkType : String ) : String = {
if (sparkType == "StringType" || sparkType == "BinaryType") { return "string" }
if (sparkType == "BooleanType") { return "boolean" }
if (sparkType == "ByteType") { return "tinyint" }
if (sparkType == "ShortType") { return "smallint" }
if (sparkType == "IntegerType") { return "int" }
if (sparkType == "LongType") { return "bigint" }
if (sparkType == "FloatType") { return "float" }
if (sparkType == "DoubleType") { return "double" }
if (sparkType.startsWith("DecimalType")) { return sparkType.replace("DecimalType","decimal") }
if (sparkType == "TimestampType" || sparkType == "DateType") { return "timestamp" }
println("########## ERROR - \"" +sparkType +"\" not supported (bug)")
return "string"
}
spark.sql("DROP TABLE IF EXISTS somedb.sometable")
{ val query = new StringBuilder
query.append("CREATE EXTERNAL TABLE somedb.sometable")
val weatherSchema =weather.dtypes
val (colName0,colType0) = weatherSchema(0)
query.append("\n ( " +colName0 + " " +toImpalaType(colType0))
for ( i <- 2 to tempSchema.length) { val (colName_,colType_) = tempSchema(i-1) ; query.append("\n , " +colName_ + " " +toImpalaType(colType_)) }
query.append("\n )\nCOMMENT 'Imported from SQL Server by Spark'")
query.append("\nSTORED AS Parquet")
query.append("\nLOCATION 'hdfs:///some/directory'")
sqlContext.sql(query.toString())
query.clear()
}
如果您想对输入进行分区 table(基于数字列 - date/time 不支持 AFAIK)然后查看 JDBC 导入选项
partitionColumn
、lowerBound
和upperBound
.
如果你想在 YARN-client 模式下并行加载这些分区,那么添加一个 --jars
参数来将 JDBC 驱动程序上传到执行器。