sqoop import-all-tables slow and sequence files are custom java 对象

sqoop import-all-tables slow and sequence files are custom java objects

我正在努力将一个非常大的数据库同步到 hive。

有 2 个问题:(1) 文本导入速度较慢,并且存在较慢的大型 mapreduce 步骤。 (2) 序列文件速度更快,但无法通过正常方式读取。

详情如下:

(1) 如果我们将数据导入为文本,则速度较慢。文件累积在临时文件夹中的主目录中,但最终创建的 mapreduce 作业相当慢。

17/04/25 04:18:34 INFO mapreduce.Job: Job job_1490822567992_0996 running in uber mode : false
17/04/25 04:18:34 INFO mapreduce.Job:  map 0% reduce 0%
17/04/25 11:05:59 INFO mapreduce.Job:  map 29% reduce 0%
17/04/25 11:20:18 INFO mapreduce.Job:  map 86% reduce 0% <-- tends to hang a very long time here

(为简洁起见删除了很多行。)

(2) 如果我们将文件导入为序列文件,速度会快得多,但是 Hive 无法读取检索到的数据,因为它需要了解自动生成的 Java 文件。这也有一个 mapreduce 步骤,但它似乎进行得更快(或者也许那是一天中的某个时间……)。

对于每个 table,我们都有一系列这样的 classes,由 sqoop 生成: public class MyTableName 扩展 SqoopRecord 实现 DBWritable, Writable

使用这些 classes 的步骤是什么?我们如何在配置单元中安装它们?令人惊讶的是,Cloudera 支持工程师并不知道,因为这一定是不常被标示的区域??

sqoop import-all-tables --connect '...' --relaxed-isolation --num-mappers 7 --compress --autoreset-to-one-mapper --compression-codec=snappy --outdir javadir --as-sequencefile --hive-delims-replacement ' '

有什么建议吗?

I am open to Spark. Do you have some sample code?

免责声明:我只是从多个笔记本中收集了一些片段,并且懒得(而且很饿)在离开办公室之前启动测试-运行。任何错误和拼写错误都由您来查找。


使用 Cloudera parcel 提供的 Spark 2.0 (支持 Hive),一个交互风格的 Scala 脚本,在本地模式下,没有任何数据分区,一个Microsoft SQL 服务器连接,并直接插入现有的 Hive 托管 table(带有一些额外的业务逻辑)...

spark2-shell --master local --driver-class-path /some/path/to/sqljdbc42.jar

// 旁注:自动注册类型 4 JDBC 驱动程序在多个 Spark 构建中被破坏,并且错误不断出现,因此指定驱动程序更安全 class 以防万一...

val weather = spark.read.format("jdbc").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", "jdbc:sqlserver://myhost\SQLExpress:9433;database=mydb").option("user", "mylogin").option("password", "*****").option("dbtable", "weather_obs").load()
{ printf( "%%% Partitions: %d / Records: %d\n", weather.rdd.getNumPartitions, weather.count)
  println("%%% Detailed DF schema:")
  weather.printSchema
}

// "dbtable" 使用子查询的替代方法:
// "(SELECT station, dt_obs_utc, temp_k FROM observation_meteo WHERE station LIKE '78%') x")

weather.registerTempTable("wth")
spark.sql(
    """
    INSERT INTO TABLE somedb.sometable
    SELECT station, dt_obs_utc, CAST(temp_k -273.15 AS DECIMAL(3,1)) as temp_c
    FROM wth
    WHERE temp_k IS NOT NULL
    """)
dropTempTable("wth")

weather.unpersist()


现在,如果您想在使用 GZip 压缩的 Parquet 文件上动态创建 Hive 外部 table,请将 "temp table" 技巧替换为...

weather.write.option("compression","gzip").mode("overwrite").parquet("hdfs:///some/directory/")

// Parquet 支持的压缩编解码器:none、snappy(默认)、gzip
// 支持的 CSV 压缩编解码器:none(默认)、snappy、lz4、gzip、bzip2

def toImpalaType(sparkType : String ) : String = {
  if (sparkType == "StringType" || sparkType == "BinaryType")  { return "string" }
  if (sparkType == "BooleanType")                              { return "boolean" }
  if (sparkType == "ByteType")                                 { return "tinyint" }
  if (sparkType == "ShortType")                                { return "smallint" }
  if (sparkType == "IntegerType")                              { return "int" }
  if (sparkType == "LongType")                                 { return "bigint" }
  if (sparkType == "FloatType")                                { return "float" }
  if (sparkType == "DoubleType")                               { return "double" }
  if (sparkType.startsWith("DecimalType"))                     { return sparkType.replace("DecimalType","decimal") }
  if (sparkType == "TimestampType" || sparkType == "DateType") { return "timestamp" }
  println("########## ERROR - \"" +sparkType +"\" not supported (bug)")
  return "string"
}

spark.sql("DROP TABLE IF EXISTS somedb.sometable")
{ val query = new StringBuilder
  query.append("CREATE EXTERNAL TABLE somedb.sometable")
  val weatherSchema =weather.dtypes
  val (colName0,colType0) = weatherSchema(0)
  query.append("\n ( " +colName0 + " " +toImpalaType(colType0))
  for ( i <- 2 to tempSchema.length) { val (colName_,colType_) = tempSchema(i-1) ; query.append("\n , " +colName_ + " " +toImpalaType(colType_)) }
  query.append("\n )\nCOMMENT 'Imported from SQL Server by Spark'")
  query.append("\nSTORED AS Parquet")
  query.append("\nLOCATION 'hdfs:///some/directory'")
  sqlContext.sql(query.toString())
  query.clear()
}


如果您想对输入进行分区 table(基于数字列 - date/time 不支持 AFAIK)然后查看 JDBC 导入选项 partitionColumnlowerBoundupperBound.

如果你想在 YARN-client 模式下并行加载这些分区,那么添加一个 --jars 参数来将 JDBC 驱动程序上传到执行器。