为什么 DataFrame.saveAsTable("df") 将 table 保存到不同的 HDFS 主机?

Why does DataFrame.saveAsTable("df") save table to different HDFS host?

我已经用 Spark (1.4.0) 配置了 Hive (1.13.1),我可以从 hive 访问所有数据库和 Table,我的仓库目录是 hdfs://192.168.1.17:8020/user/hive/warehouse

但是,当我尝试使用 df.saveAsTable("df") 函数通过 Spark-Shell(使用 master)将 Dataframe 保存到 Hive 中时,我收到了这个错误。

15/07/03 14:48:59 INFO audit: ugi=user  ip=unknown-ip-addr  cmd=get_database: default   
15/07/03 14:48:59 INFO HiveMetaStore: 0: get_table : db=default tbl=df
15/07/03 14:48:59 INFO audit: ugi=user  ip=unknown-ip-addr  cmd=get_table : db=default tbl=df   
java.net.ConnectException: Call From bdiuser-Vostro-3800/127.0.1.1 to 192.168.1.19:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
    at org.apache.hadoop.ipc.Client.call(Client.java:1414)
    at org.apache.hadoop.ipc.Client.call(Client.java:1363)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
    at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
    at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
    at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1124)
    at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:78)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:88)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:88)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
    at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:239)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:88)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:88)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1517)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:22)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC.<init>(<console>:37)
    at $iwC.<init>(<console>:39)
    at <init>(<console>:41)
    at .<init>(<console>:45)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
    at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493)
    at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699)
    at org.apache.hadoop.ipc.Client$Connection.access00(Client.java:367)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462)
    at org.apache.hadoop.ipc.Client.call(Client.java:1381)
    ... 86 more

当我遇到这个错误时,我发现程序尝试使用不同的主机进行 HDFS 连接以保存 table。

我也试过不同工人的火花-shell,我得到了同样的错误。

使用 saveAsTable,Spark 保存到的默认位置由 HiveMetastore 控制(基于文档)。另一种选择是使用 saveAsParquetFile 并指定路径,然后将该路径注册到您的配置单元 Metastore 或使用新的 DataFrameWriter 接口并指定路径选项 write.format(source).mode(mode).options(options).saveAsTable(tableName).

请查找以下示例:

val options = Map("path" -> hiveTablePath)
result.write.format("orc").partitionBy("partitiondate").options(options).mode(SaveMode.Append).saveAsTable(hiveTable)

我有explained this a little bit more in my blog.

您可以将 spark dataframe 写入现有的 spark table。

请查找以下示例:

df.write.mode("overwrite").saveAsTable("database.tableName")