AWS EMR:文件存在但错误提示文件不存在

AWS EMR: File exists but error says file does not exist

我使用 Release label:emr-6.2.0 创建了一个 AWS EMR Spark 集群 Hadoop distribution:Amazon Applications:Spark 3.0.1, Zeppelin 0.9.0 并将我所有的本地文件(.jars、.py、.csv 和 sas7bdat)复制到集群主机

当我做的时候

[hadoop@ip-172-31-22-207 ~]$ ls -al /home/hadoop/sas_data1/
total 1071812
rwxrwxr-x 2 hadoop hadoop 66 Sep 13 04:08 .
drwxr-xr-x 7 hadoop hadoop 4096 Sep 13 04:38 ..
-rw-r--r-- 1 hadoop hadoop 471990272 Sep 13 04:07 file1.sas7bdat
-rw-r--r-- 1 hadoop hadoop 625541120 Sep 13 04:08 file2.sas7bdat

输出显示文件存在。另外,在我的程序中 /home/hadoop,

def process_raw_data(inputs, output):
    spark = SparkSession.builder.\
        config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12").\
        enableHiveSupport().getOrCreate()
    sas_dir = f'{os.getcwd()}/sas_data1'
    for filename in os.listdir(f"{sas_dir}"):
        extension = os.path.splitext(filename)[1]
        print("!!!!!!!!!!",f'{sas_dir}/{filename}')
        df_spark = spark.read.format('com.github.saurfang.sas.spark').\
            load(f'{sas_dir}/{filename}')
        raw_df = df_spark.select('field1','field2')
        raw_df.write.mode('append').parquet(output + '/raw_data_output')

我正在遍历 sas_data1 目录中的文件,在输出中它正确地将文件名显示为 !!!!!!!!!! /home/hadoop/sas_data1/file1.sas7bdat,这只有在文件存在时才有可能。但是我收到文件不存在的错误。我运行以下命令;

spark-submit --jars parso-2.0.11.jar,spark-sas7bdat-3.0.0-s_2.12.jar,hadoop-aws-2.7.4.jar,aws-java-sdk-1.7.4.jar --master yarn process_raw_files.py

File "/home/hadoop/process_raw_files.py", line 112, in <module>
    output_bucket % 'raw_immigration_output')
  File "/home/hadoop/process_raw_files.py", line 21, in process_raw_immigration
    load(f'{sas_dir}/{filename}')
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 178, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.io.FileNotFoundException: File does not exist: /home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
    at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:866)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:842)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1010)
    at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:319)
    at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:315)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:327)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
    at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:181)
    at com.github.saurfang.sas.spark.SasRelation.<init>(SasRelation.scala:73)
    at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:45)
    at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209)
    at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42)
    at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:286)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
    at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
    at org.apache.hadoop.ipc.Client.call(Client.java:1491)
    at org.apache.hadoop.ipc.Client.call(Client.java:1388)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
    at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
    at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
    ... 31 more

我将文件存储在EMR Cluster master 的正常存储中。为什么调试会显示文件名但错误提示文件不存在?是不是和我没有复制文件到的worker节点有关?

Spark 将在 HDFS 上查找文件。将文件复制到 HDFS 并重新运行作业。