将 parquet 存储到 Kerberos 从 Spark 保护 Webhdfs
Storing parquet to Kerberos secured Webhdfs from Spark
我正在从 Spark
写入 webhdfs
路径,受 Kerberos
保护。
它的一部分实际上可以工作,但是在将镶木地板文件写入 (web)hdfs
位置时它会崩溃。
身份验证和授权工作,脚本创建存储 partquet 文件所需的路径结构,但是当实际写入开始时,OutputStreams 开始失败。
spark 配置看起来像这样(我让它更冗长了一点):
val hadoopConfig = new Configuration()
hadoopConfig.set(FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
hadoopConfig.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
hadoopConfig.set(HADOOP_SECURITY_AUTHORIZATION, "true")
UserGroupInformation.setConfiguration(hadoopConfig)
UserGroupInformation.loginUserFromKeytab("principal@REALM", "path/to/keytab.keytab")
new SparkConf()
.setIfMissing("spark.master", "local[*]")
.set("spark.yarn.keytab", "path/to/keytab.keytab")
.set("spark.yarn.principal", "principal@REALM")
.set("spark.hadoop." + FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
.set("spark.hadoop." + HADOOP_SECURITY_AUTHENTICATION, "kerberos")
.set("spark.hadoop." + HADOOP_SECURITY_AUTHORIZATION, "true")
当我使用 spark.write.parquet(或 .text)时,它确实创建了提到的路径(例如,使用目标 /user/tom/dump/2018/06/11
它确实在 hfds 上创建了该目录路径),但是当作业到了要存储实际数据的地步,但由于此错误而失败。
...
11:43:56,668 INFO TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 19, localhost, executor driver, partition 0, ANY, 7754 bytes)
11:43:56,668 INFO Executor:54 - Running task 0.0 in stage 1.0 (TID 19)
11:43:56,674 INFO deprecation:1129 - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
11:43:56,702 INFO ShuffleBlockFetcherIterator:54 - Getting 19 non-empty blocks out of 19 blocks
11:43:56,703 INFO ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 3 ms
11:43:56,713 INFO SQLHadoopMapReduceCommitProtocol:54 - Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
11:43:57,016 ERROR Utils:91 - Aborting task
java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
11:43:57,186 WARN FileOutputCommitter:467 - Could not delete webhdfs://hadoop-cm001.ix5.shared.prod.st.ecg.so:14000/user/mim_dev/cdata/user_profiling_dump/cdata/2018/06/11/_temporary/0/_temporary/attempt_20180611114356_0001_m_000000_0
11:43:57,187 ERROR FileFormatWriter:70 - Job job_20180611114356_0001 aborted.
11:43:57,188 WARN Utils:87 - Suppressing exception in catch: Unauthorized
org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access0(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
11:43:57,190 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 19)
org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access0(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
... 9 more
11:43:57,199 WARN TaskSetManager:66 - Lost task 0.0 in stage 1.0 (TID 19, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access0(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
... 9 more
11:43:57,200 ERROR TaskSetManager:70 - Task 0 in stage 1.0 failed 1 times; aborting job
11:43:57,201 INFO TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool default
11:43:57,203 INFO TaskSchedulerImpl:54 - Cancelling stage 1
...
所以来自@Samson-Sharfrichter 的提示让我了解了 Spark 如何在内部处理这些连接,我偶然发现了一个(恕我直言,没有很好记录的)功能:SparkHadoopUtil
使用 UserGroupInformation.loginUserFromKeytab
登录并设置 SparkConf 后,我使用此实用程序添加凭据,例如:
val credentials = UserGroupInformation.getLoginUser.getCredentials
SparkHadoopUtil.get.addCurrentUserCredentials(credentials)
这实际上确保了每个作业都可以访问正确的凭据。
我正在从 Spark
写入 webhdfs
路径,受 Kerberos
保护。
它的一部分实际上可以工作,但是在将镶木地板文件写入 (web)hdfs
位置时它会崩溃。
身份验证和授权工作,脚本创建存储 partquet 文件所需的路径结构,但是当实际写入开始时,OutputStreams 开始失败。
spark 配置看起来像这样(我让它更冗长了一点):
val hadoopConfig = new Configuration()
hadoopConfig.set(FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
hadoopConfig.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
hadoopConfig.set(HADOOP_SECURITY_AUTHORIZATION, "true")
UserGroupInformation.setConfiguration(hadoopConfig)
UserGroupInformation.loginUserFromKeytab("principal@REALM", "path/to/keytab.keytab")
new SparkConf()
.setIfMissing("spark.master", "local[*]")
.set("spark.yarn.keytab", "path/to/keytab.keytab")
.set("spark.yarn.principal", "principal@REALM")
.set("spark.hadoop." + FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
.set("spark.hadoop." + HADOOP_SECURITY_AUTHENTICATION, "kerberos")
.set("spark.hadoop." + HADOOP_SECURITY_AUTHORIZATION, "true")
当我使用 spark.write.parquet(或 .text)时,它确实创建了提到的路径(例如,使用目标 /user/tom/dump/2018/06/11
它确实在 hfds 上创建了该目录路径),但是当作业到了要存储实际数据的地步,但由于此错误而失败。
...
11:43:56,668 INFO TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 19, localhost, executor driver, partition 0, ANY, 7754 bytes)
11:43:56,668 INFO Executor:54 - Running task 0.0 in stage 1.0 (TID 19)
11:43:56,674 INFO deprecation:1129 - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
11:43:56,702 INFO ShuffleBlockFetcherIterator:54 - Getting 19 non-empty blocks out of 19 blocks
11:43:56,703 INFO ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 3 ms
11:43:56,713 INFO SQLHadoopMapReduceCommitProtocol:54 - Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
11:43:57,016 ERROR Utils:91 - Aborting task
java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
11:43:57,186 WARN FileOutputCommitter:467 - Could not delete webhdfs://hadoop-cm001.ix5.shared.prod.st.ecg.so:14000/user/mim_dev/cdata/user_profiling_dump/cdata/2018/06/11/_temporary/0/_temporary/attempt_20180611114356_0001_m_000000_0
11:43:57,187 ERROR FileFormatWriter:70 - Job job_20180611114356_0001 aborted.
11:43:57,188 WARN Utils:87 - Suppressing exception in catch: Unauthorized
org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access0(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
11:43:57,190 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 19)
org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access0(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
... 9 more
11:43:57,199 WARN TaskSetManager:66 - Lost task 0.0 in stage 1.0 (TID 19, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access0(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
... 9 more
11:43:57,200 ERROR TaskSetManager:70 - Task 0 in stage 1.0 failed 1 times; aborting job
11:43:57,201 INFO TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool default
11:43:57,203 INFO TaskSchedulerImpl:54 - Cancelling stage 1
...
所以来自@Samson-Sharfrichter 的提示让我了解了 Spark 如何在内部处理这些连接,我偶然发现了一个(恕我直言,没有很好记录的)功能:SparkHadoopUtil
使用 UserGroupInformation.loginUserFromKeytab
登录并设置 SparkConf 后,我使用此实用程序添加凭据,例如:
val credentials = UserGroupInformation.getLoginUser.getCredentials
SparkHadoopUtil.get.addCurrentUserCredentials(credentials)
这实际上确保了每个作业都可以访问正确的凭据。