使用 Apache Spark 查询多个 Hive 存储

Querying on multiple Hive stores using Apache Spark

我有一个 spark 应用程序,它将成功连接到 hive 并使用 spark 引擎查询 hive 表。

为了构建它,我只是将 hive-site.xml 添加到应用程序的类路径中,spark 将读取 hive-site.xml 以连接到其 Metastore。这个方法在spark的邮件列表中被推荐。

到目前为止一切顺利。现在我想连接到两个蜂巢商店,我认为将另一个 hive-site.xml 添加到我的类路径不会有帮助。我参考了很多文章和 spark 邮件列表,但找不到任何人这样做。

有人可以建议我如何实现吗?

谢谢。

参考文档:

这在当前版本的 Spark 中似乎是不可能的。阅读 Spark Repo 中的 HiveContext code,似乎 hive.metastore.uris 是许多 Metastore 可配置的东西,但它似乎仅用于同一 Metastore 之间的冗余,而不是完全不同的 Metastore。

这里有更多信息https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin

但是您可能必须将数据聚合到某处才能统一处理。或者您可以为每个商店创建多个 Spark 上下文。

您可以尝试为多个不同的 Metastore 配置 hive.metastore.uris,但这可能行不通。如果您确实决定为每个商店创建多个 Spark 上下文,请务必设置 spark.driver.allowMultipleContexts,但通常不鼓励这样做,并且可能会导致意外结果。

我认为这可以通过使用 Spark SQL 使用 JDBC 从远程数据库连接和读取数据的功能来实现。

经过详尽的研发,我成功地能够使用 JDBC 连接到两个不同的 Hive 环境,并将 Hive 表作为 DataFrames 加载到 Spark 中进行进一步处理。

环境详细信息

hadoop-2.6.0

apache-hive-2.0.0-bin

spark-1.3.1-bin-hadoop2.6

代码示例 HiveMultiEnvironment.scala

import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
object HiveMultiEnvironment {
  def main(args: Array[String]) {
    var conf = new SparkConf().setAppName("JDBC").setMaster("local")
    var sc = new SparkContext(conf)
    var sqlContext = new SQLContext(sc)

    // load hive table (or) sub-query from Environment 1

    val jdbcDF1 = sqlContext.load("jdbc", Map(
      "url" -> "jdbc:hive2://<host1>:10000/<db>",
      "dbtable" -> "<db.tablename or subquery>",
      "driver" -> "org.apache.hive.jdbc.HiveDriver",
      "user" -> "<username>",
      "password" -> "<password>"))
    jdbcDF1.foreach { println }
      
    // load hive table (or) sub-query from Environment 2

    val jdbcDF2 = sqlContext.load("jdbc", Map(
      "url" -> "jdbc:hive2://<host2>:10000/<db>",
      "dbtable" -> "<db.tablename> or <subquery>",
      "driver" -> "org.apache.hive.jdbc.HiveDriver",
      "user" -> "<username>",
      "password" -> "<password>"))
    jdbcDF2.foreach { println }
  }
  // todo: business logic
}

其他参数也可以在加载期间使用 SqlContext 设置,例如设置 partitionColumn。在 Spark 参考文档的 'JDBC To Other Databases' 部分下找到的详细信息: https://spark.apache.org/docs/1.3.0/sql-programming-guide.html

从 Eclipse 构建路径:

我还没有尝试过的东西

在环境 1 中使用 HiveContext,在环境 2 中使用 SqlContext

希望这会有所帮助。