Kerberos 环境中与 Hbase 的 Spark 连接失败
Spark connection to Hbase in Kerberos environement failing
我正在使用
- Spark 1.6.0(
spark-1.2.0-cdh5.10.2
)
- cloudera 虚拟机 (
spark-1.2.0-cdh5.10.2
)
- Hbase(来自cloudera的1.2.0)
- 斯卡拉 2.10
- 已启用 Kerberos
我运行的步骤是:
kinit
(这样我的用户就到位了)
2.
spark-shell --master yarn --executor-memory 256m --jars /opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-spark-1.2.0-cdh5.10.2.jar
3.
```
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseConfiguration }
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.client.Scan
val tableName = "web-table"
val scan = new Scan()
scan.setCaching(100)
//sc.setLogLevel("DEBUG")
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "quickstart.cloudera");
conf.set("hbase.client.retries.number", Integer.toString(1));
conf.set("zookeeper.session.timeout", Integer.toString(60000));
conf.set("zookeeper.recovery.retry", Integer.toString(20))
val hbaseContext = new HBaseContext(sc, conf)
val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
getRdd.take(1)
```
以上代码失败并显示以下堆栈跟踪
```
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=1, exceptions:
Wed Feb 07 20:30:27 PST 2018, RpcRetryingCaller{globalStartTime=1518064227140, pause=100, retries=1}, java.io.IOException: Broken pipe
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:147)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.hbase.ipc.IPCUtil.write(IPCUtil.java:278)
at org.apache.hadoop.hbase.ipc.IPCUtil.write(IPCUtil.java:266)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:921)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:874)
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1243)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:34094)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:400)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:65)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:381)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:355)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126)
```
如果有人看到此错误并知道解决方案,请告诉我。
另外值得一提的是
- 我已经尝试向 spark 应用程序提供 --principal 和 --keytab
- 我在 HBase 配置中提供了更多配置,例如 jaas。
然而在调试模式下,这个错误看起来非常可疑,这让我想知道 zookeeper 通过 spark 与 Hbase 的交互是否有问题。
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
这是重现同一问题的另一种方法
kinit...
```
spark-submit --master yarn --executor-memory 256m --class org.apache.hadoop.hbase.spark.example.hbasecontext.HBaseDistributedScanExample /opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-spark-1.2.0-cdh5.10.2.jar web-table
```
代码在扫描区域阶段停留了一段时间,然后失败 broken pipe error
更多动物园管理员相关消息。那里的专家可能会感兴趣。
```
18/02/07 20:51:05 INFO zookeeper.ZooKeeper: Client environment:user.dir=/home/cloudera/Desktop
18/02/07 20:51:05 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x1c053d8a0x0, quorum=localhost:2181, baseZNode=/hbase
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /127.0.0.1:49815, server: localhost/127.0.0.1:2181
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x161735f4d4700d4, negotiated timeout = 60000
18/02/07 20:51:05 INFO util.RegionSizeCalculator: Calculating region sizes for table "web-table".
18/02/07 20:51:53 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x161735f4d4700d4
18/02/07 20:51:53 INFO zookeeper.ZooKeeper: Session: 0x161735f4d4700d4 closed
18/02/07 20:51:53 INFO zookeeper.ClientCnxn: EventThread shut down
```
这个问题仅仅是因为Cloudera VM enable kerberos
没有完全让系统准备就绪。
所以解决方案(简单的方法)是
Spark-> Configruation -> HBase Service -> HBase(Default is None)
这一步最终添加了一堆配置,让 spark 通过 kerberos
与 HBase 对话。
我正在使用
- Spark 1.6.0(
spark-1.2.0-cdh5.10.2
) - cloudera 虚拟机 (
spark-1.2.0-cdh5.10.2
) - Hbase(来自cloudera的1.2.0)
- 斯卡拉 2.10
- 已启用 Kerberos
我运行的步骤是:
kinit
(这样我的用户就到位了)
2.
spark-shell --master yarn --executor-memory 256m --jars /opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-spark-1.2.0-cdh5.10.2.jar
3.
```
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseConfiguration }
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.client.Scan
val tableName = "web-table"
val scan = new Scan()
scan.setCaching(100)
//sc.setLogLevel("DEBUG")
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "quickstart.cloudera");
conf.set("hbase.client.retries.number", Integer.toString(1));
conf.set("zookeeper.session.timeout", Integer.toString(60000));
conf.set("zookeeper.recovery.retry", Integer.toString(20))
val hbaseContext = new HBaseContext(sc, conf)
val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
getRdd.take(1)
```
以上代码失败并显示以下堆栈跟踪
```
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=1, exceptions:
Wed Feb 07 20:30:27 PST 2018, RpcRetryingCaller{globalStartTime=1518064227140, pause=100, retries=1}, java.io.IOException: Broken pipe
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:147)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.hbase.ipc.IPCUtil.write(IPCUtil.java:278)
at org.apache.hadoop.hbase.ipc.IPCUtil.write(IPCUtil.java:266)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:921)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:874)
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1243)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:34094)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:400)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:65)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:381)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:355)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126)
```
如果有人看到此错误并知道解决方案,请告诉我。
另外值得一提的是
- 我已经尝试向 spark 应用程序提供 --principal 和 --keytab
- 我在 HBase 配置中提供了更多配置,例如 jaas。
然而在调试模式下,这个错误看起来非常可疑,这让我想知道 zookeeper 通过 spark 与 Hbase 的交互是否有问题。
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
这是重现同一问题的另一种方法
kinit...
```
spark-submit --master yarn --executor-memory 256m --class org.apache.hadoop.hbase.spark.example.hbasecontext.HBaseDistributedScanExample /opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-spark-1.2.0-cdh5.10.2.jar web-table
```
代码在扫描区域阶段停留了一段时间,然后失败 broken pipe error
更多动物园管理员相关消息。那里的专家可能会感兴趣。
```
18/02/07 20:51:05 INFO zookeeper.ZooKeeper: Client environment:user.dir=/home/cloudera/Desktop
18/02/07 20:51:05 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x1c053d8a0x0, quorum=localhost:2181, baseZNode=/hbase
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /127.0.0.1:49815, server: localhost/127.0.0.1:2181
18/02/07 20:51:05 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x161735f4d4700d4, negotiated timeout = 60000
18/02/07 20:51:05 INFO util.RegionSizeCalculator: Calculating region sizes for table "web-table".
18/02/07 20:51:53 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x161735f4d4700d4
18/02/07 20:51:53 INFO zookeeper.ZooKeeper: Session: 0x161735f4d4700d4 closed
18/02/07 20:51:53 INFO zookeeper.ClientCnxn: EventThread shut down
```
这个问题仅仅是因为Cloudera VM enable kerberos
没有完全让系统准备就绪。
所以解决方案(简单的方法)是
Spark-> Configruation -> HBase Service -> HBase(Default is None)
这一步最终添加了一堆配置,让 spark 通过 kerberos
与 HBase 对话。