问题:Spark shell 中用于从 Hbase 检索数据的 Scala 代码

Issue: Scala code in Spark shell to retrieve data from Hbase

我们正在尝试在 Spark shell 中执行一个简单的 Scala 代码以从 Hbase 检索数据。 Hadoop环境启用了Kerberos,我们确保执行kinit。

调用 Spark 的步骤 Shell:

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

代码:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

以下是下面的ERROR

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access0(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

请注意:

  1. 我们能够从同一个会话调用 Hbase shell 并从同一个 table
  2. 扫描记录
  3. 我们能够从同一个 Spark Shell 会话中对 HDFS 文件执行字数统计
  4. 我们可以在本地模式下执行上面的代码
  5. 我们能够从同一个 spark-shell 会话中执行其他操作,例如 – 一种。 val admin = new HBaseAdmin(hc) b. print(admin.isTableAvailable(“poc-customers”))

正在寻求解决此问题的帮助。

你的问题的根本原因是

GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

Cloudera Troubleshooting Guide 提出了这个问题的解决方案

Description: A user must have a valid Kerberos ticket in order to interact with a secure Hadoop cluster. Running any Hadoop command (such as hadoop fs -ls) will fail if you do not have a valid Kerberos ticket in your credentials cache. If you do not have a valid ticket, you will receive an error such as:

11/01/04 12:08:12 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Bad connection to FS. command aborted. exception: Call to nn-host/10.0.0.2:8020 failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Solution: You can examine the Kerberos tickets currently in your credentials cache by running the klist command. You can obtain a ticket by running the kinit command and either specifying a keytab file containing credentials, or entering the password for your principal.

您可以尝试建议的解决方案。

当 Spark "driver" 请求 YARN 在集群中的某处生成它的 "executors" 时,它使用它的 local Kerberos TGT -- 你创建的那个with kinit -- 进行身份验证。然后 YARN 发出一个 global 委托令牌,由所有执行者共享以访问 HDFS 和 YARN。

唉,HBase 不支持那个委托令牌。每个执行者必须重新验证到ZK,然后到实际的HBase RegionServer,使用local TGT。

在一个完美的世界中,您只需要在 "spark-default.conf" 中插入两个属性,即 spark.yarn.principalspark.yarn.keytab (创建一个密钥表来存储您的密码是sthg 你用 "ktutil" 实用程序)

las,该功能是为需要更新其 HDFS 委托令牌(通常每 7 天)的长期 运行ning 流式作业而构建的,而不是用于 HBase 初始身份验证。现在,Spark 1.6 的发行说明显示了许多与 YARN 和 Kerberos 相关的错误修复,也许该功能现在也适用于 HBase。但我不会打赌。

那么解决方法是什么?

  1. 在驱动程序的 Java 代码 运行 中,说明必须使用 addFile()
  2. 将 keytab 文件发送给每个执行者
  3. 在执行者的 Java 代码 运行 中,显式创建一个 Hadoop UserGroupInformation 显式地从密钥表,在连接到 HBase
  4. 之前

请注意,当以这种方式使用时,UGI 将其 TGT 保密——它不会显示在缓存中,因此同一台机器上的其他进程无法重用它(另一方面 kinit 来自另一个进程不会篡改它)。

我和 OP 在做同一个项目。我们没有直接使用 Samson Scharfrichter 的答案,但它让我们相信这种解决方案是可行的。以下是对我们有用的方法:

我们现在使用来自 SparkOnHBase 的 RDD(https://github.com/cloudera-labs/SparkOnHBase), but we have incorporated the change suggested at https://github.com/cloudera-labs/SparkOnHBase/pull/7。由于这个 pull request 是开放的,它的变化也可以通过子类化来实现:

import com.cloudera.spark.hbase.{HBaseContext, HBaseScanRDD}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD

class MyHBaseScanRDD (sc: SparkContext,
    @transient tableName: String,
    @transient scan: Scan,
    configBroadcast: Broadcast[SerializableWritable[Configuration]]) extends HBaseScanRDD(sc, tableName, scan, configBroadcast) {
  val jobCredentialBroadcast = sc.broadcast(new SerializableWritable(jobTransient.getCredentials))

  override def addCreds {
    val creds = SparkHadoopUtil.get.getCurrentUserCredentials
    @transient val ugi = UserGroupInformation.getCurrentUser
    ugi.addCredentials(creds)
    ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
    ugi.addCredentials(jobCredentialBroadcast.value.value)
  }
}

class MyHBaseContext (sc: SparkContext,
    @transient config: Configuration,
    val tmpHdfsConfigFile: String = null) extends HBaseContext(sc, config, tmpHdfsConfigFile) {
  def myHBaseScanRDD(tableName: String, scan: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {
    new MyHBaseScanRDD(sc, tableName, scan, broadcastedConf)
  }
}

val hc = HBaseConfiguration.create
val scan = new Scan
val hbaseContext = new MyHBaseContext(sc, hc)
val rdd = hbaseContext.myHBaseScanRDD("tableName", scan)
rdd.count

看起来这些更改已合并到 HBase 的 HBase-Spark 模块中,该模块是 SparkOnHBase 的继承者。版本控制问题使我们无法使用较新的 HBase 库,但我建议遇到此问题的任何人先尝试一下。