HBase 连接实例

HBase Connection Instance

我有以下代码:

DStream.map {
      _.message()
}.foreachRDD { rdd =>
    rdd.foreachPartition{iter =>
        val conf = HBaseUtils.configureHBase("iemployee")
        val connection = ConnectionFactory.createConnection(conf)
        val table = connection.getTable(TableName.valueOf("""iemployee"""))
        iter.foreach{elem =>
        /* loop through the records in the partition and push them out to the DB */
    }
}

有人可以告诉我这里创建的连接对象 val connection = ConnectionFactory.createConnection(conf) 是否与每个分区中使用的连接对象相同(因为我从不关闭它),或者是否会为每个分区创建一个新的连接对象?

每个分区的新连接实例..

请看下面code & documentation of Connection Factory。还提到它的呼叫者有责任关闭连接。

 /**
   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
   * created from returned connection share zookeeper connection, meta cache, and connections
   * to region servers and masters.
   * <br>
   * The caller is responsible for calling {@link Connection#close()} on the returned
   * connection instance.
   *
   * Typical usage:
   * <pre>
   * Connection connection = ConnectionFactory.createConnection(conf);
   * Table table = connection.getTable(TableName.valueOf("table1"));
   * try {
   *   table.get(...);
   *   ...
   * } finally {
   *   table.close();
   *   connection.close();
   * }
   * </pre>
   *
   * @param conf configuration
   * @param user the user the connection is for
   * @param pool the thread pool to use for batch operations
   * @return Connection object for <code>conf</code>
   */
  public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
  throws IOException {
    if (user == null) {
      UserProvider provider = UserProvider.instantiate(conf);
      user = provider.getCurrent();
    }

    String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
      ConnectionImplementation.class.getName());
    Class<?> clazz;
    try {
      clazz = Class.forName(className);
    } catch (ClassNotFoundException e) {
      throw new IOException(e);
    }
    try {
      // Default HCM#HCI is not accessible; make it so before invoking.
      Constructor<?> constructor =
        clazz.getDeclaredConstructor(Configuration.class,
          ExecutorService.class, User.class);
      constructor.setAccessible(true);
      return (Connection) constructor.newInstance(conf, pool, user);
    } catch (Exception e) {
      throw new IOException(e);
    }
  }
}

希望对您有所帮助!!