Spark SASL 无法使用 yarn 在 emr 上工作
Spark SASL not working on the emr with yarn
首先,我想说的是,我所看到的唯一解决此问题的方法是:Spark 1.6.1 SASL。但是,在添加spark和yarn认证的配置时,还是不行。下面是我在亚马逊 emr 上的纱线集群上使用 spark-submit 的 spark 配置:
SparkConf sparkConf = new SparkConf().setAppName("secure-test");
sparkConf.set("spark.authenticate.enableSaslEncryption", "true");
sparkConf.set("spark.network.sasl.serverAlwaysEncrypt", "true");
sparkConf.set("spark.authenticate", "true");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator");
try {
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.LongWritable"),
Class.forName("org.apache.hadoop.io.Text")
});
} catch (Exception e) {}
sparkContext = new JavaSparkContext(sparkConf);
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true");
sparkContext.hadoopConfiguration().set("spark.authenticate", "true");
请注意,我在代码中将 spark.authenticate 添加到 sparkContext 的 hadoop 配置中,而不是核心-site.xml(我假设我可以这样做,因为其他事情也能正常工作)。
看这里:https://github.com/apache/spark/blob/master/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java 似乎两个 spark.authenticate 都是必需的。当我 运行 这个应用程序时,我得到以下堆栈跟踪。
17/01/03 22:10:23 INFO storage.BlockManager: Registering executor with local external shuffle service.
17/01/03 22:10:23 ERROR client.TransportClientFactory: Exception while bootstrapping client after 178 ms
java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown message type: -22
at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:67)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:71)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
在 Spark 的文档中,它说
For Spark on YARN deployments, configuring spark.authenticate to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret.
根据上面 yarn 文件中的评论,这似乎是错误的,但是通过故障排除,我仍然不知道应该去哪里让 sasl 工作?我是否遗漏了某处记录的明显内容?
所以我终于想通了。之前的 Whosebug 线程在技术上是正确的。我需要将 spark.authenticate 添加到纱线配置中。也许可以这样做,但我无法弄清楚如何在代码中添加此配置,这在较高层次上是有意义的,为什么会这样。我将 post 我的配置如下,以防将来其他人遇到此问题。
首先,我使用了 aws emr 配置文件(一个例子是使用 aws cli aws emr create-cluster --configurations file://youpathhere.json
)
然后,我在文件中添加了以下内容json:
[{
"Classification": "spark-defaults",
"Properties": {
"spark.authenticate": "true",
"spark.authenticate.enableSaslEncryption": "true",
"spark.network.sasl.serverAlwaysEncrypt": "true"
}
},
{
"Classification": "core-site",
"Properties": {
"spark.authenticate": "true"
}
}]
在为 Spark 网络加密添加配置选项后,我在 Dataproc(Google 云平台)上的 Spark 上收到了相同的错误消息。
我最初使用以下命令创建了 Dataproc 集群。
gcloud dataproc clusters create test-encryption --no-address \
--service-account=<SERVICE-ACCOUNT> \
--zone=europe-west3-c --region=europe-west3 \
--subnet=<SUBNET> \
--properties 'spark:spark.authenticate=true,spark:spark.network.crypto.enabled=true'
解决方案是另外添加配置 'yarn:spark.authenticate=true'
。因此,可以按如下方式创建具有 Spark 的 RPC 加密的工作 Dataproc 集群。
gcloud dataproc clusters create test-encryption --no-address \
--service-account=<SERVICE-ACCOUNT> \
--zone=europe-west3-c --region=europe-west3 \
--subnet=<SUBNET> \
--properties 'spark:spark.authenticate=true,spark:spark.network.crypto.enabled=true,yarn:spark.authenticate=true'
我用ngrep验证了加密。我在master节点上安装了ngrep如下
sudo apt-get update
sudo apt-get install ngrep
然后我在任意端口 20001 上 运行 ngrep。
sudo ngrep port 20001
如果您随后 运行 具有以下配置属性的 Spark 作业,您可以看到驱动程序和工作节点之间的加密通信。
spark.driver.port=20001
spark.blockManager.port=20002
注意,我总是建议在 Dataproc 上启用 Kerberos 以保护 Hadoop、Yarn 等的身份验证。这可以通过集群创建命令中的标志 --enable-kerberos
来实现。
首先,我想说的是,我所看到的唯一解决此问题的方法是:Spark 1.6.1 SASL。但是,在添加spark和yarn认证的配置时,还是不行。下面是我在亚马逊 emr 上的纱线集群上使用 spark-submit 的 spark 配置:
SparkConf sparkConf = new SparkConf().setAppName("secure-test");
sparkConf.set("spark.authenticate.enableSaslEncryption", "true");
sparkConf.set("spark.network.sasl.serverAlwaysEncrypt", "true");
sparkConf.set("spark.authenticate", "true");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator");
try {
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.LongWritable"),
Class.forName("org.apache.hadoop.io.Text")
});
} catch (Exception e) {}
sparkContext = new JavaSparkContext(sparkConf);
sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sparkContext.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true");
sparkContext.hadoopConfiguration().set("spark.authenticate", "true");
请注意,我在代码中将 spark.authenticate 添加到 sparkContext 的 hadoop 配置中,而不是核心-site.xml(我假设我可以这样做,因为其他事情也能正常工作)。
看这里:https://github.com/apache/spark/blob/master/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java 似乎两个 spark.authenticate 都是必需的。当我 运行 这个应用程序时,我得到以下堆栈跟踪。
17/01/03 22:10:23 INFO storage.BlockManager: Registering executor with local external shuffle service. 17/01/03 22:10:23 ERROR client.TransportClientFactory: Exception while bootstrapping client after 178 ms java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown message type: -22 at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:67) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:71) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745)
在 Spark 的文档中,它说
For Spark on YARN deployments, configuring spark.authenticate to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret.
根据上面 yarn 文件中的评论,这似乎是错误的,但是通过故障排除,我仍然不知道应该去哪里让 sasl 工作?我是否遗漏了某处记录的明显内容?
所以我终于想通了。之前的 Whosebug 线程在技术上是正确的。我需要将 spark.authenticate 添加到纱线配置中。也许可以这样做,但我无法弄清楚如何在代码中添加此配置,这在较高层次上是有意义的,为什么会这样。我将 post 我的配置如下,以防将来其他人遇到此问题。
首先,我使用了 aws emr 配置文件(一个例子是使用 aws cli aws emr create-cluster --configurations file://youpathhere.json
)
然后,我在文件中添加了以下内容json:
[{
"Classification": "spark-defaults",
"Properties": {
"spark.authenticate": "true",
"spark.authenticate.enableSaslEncryption": "true",
"spark.network.sasl.serverAlwaysEncrypt": "true"
}
},
{
"Classification": "core-site",
"Properties": {
"spark.authenticate": "true"
}
}]
在为 Spark 网络加密添加配置选项后,我在 Dataproc(Google 云平台)上的 Spark 上收到了相同的错误消息。
我最初使用以下命令创建了 Dataproc 集群。
gcloud dataproc clusters create test-encryption --no-address \
--service-account=<SERVICE-ACCOUNT> \
--zone=europe-west3-c --region=europe-west3 \
--subnet=<SUBNET> \
--properties 'spark:spark.authenticate=true,spark:spark.network.crypto.enabled=true'
解决方案是另外添加配置 'yarn:spark.authenticate=true'
。因此,可以按如下方式创建具有 Spark 的 RPC 加密的工作 Dataproc 集群。
gcloud dataproc clusters create test-encryption --no-address \
--service-account=<SERVICE-ACCOUNT> \
--zone=europe-west3-c --region=europe-west3 \
--subnet=<SUBNET> \
--properties 'spark:spark.authenticate=true,spark:spark.network.crypto.enabled=true,yarn:spark.authenticate=true'
我用ngrep验证了加密。我在master节点上安装了ngrep如下
sudo apt-get update
sudo apt-get install ngrep
然后我在任意端口 20001 上 运行 ngrep。
sudo ngrep port 20001
如果您随后 运行 具有以下配置属性的 Spark 作业,您可以看到驱动程序和工作节点之间的加密通信。
spark.driver.port=20001
spark.blockManager.port=20002
注意,我总是建议在 Dataproc 上启用 Kerberos 以保护 Hadoop、Yarn 等的身份验证。这可以通过集群创建命令中的标志 --enable-kerberos
来实现。