无法在 Google Dataproc 上使用 SparkContext.textFile(...) 通过 FTP 读取文件
Can't read files via FTP using SparkContext.textFile(...) on Google Dataproc
我是 运行 Google Dataproc 上的 Spark 集群,我在尝试使用 sparkContext.textFile(...)
从 FTP 读取 GZip 文件时遇到了一些问题。
我是运行的代码是:
object SparkFtpTest extends App {
val file = "ftp://username:password@host:21/filename.txt.gz"
val lines = sc.textFile(file)
lines.saveAsTextFile("gs://my-bucket-storage/tmp123")
}
我得到的错误是:
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
我看到有人提示凭据有误,所以我尝试输入了错误的凭据,但错误是不同的,即无效的登录凭据。
如果我将 URL 复制到浏览器中,它也有效 - 文件正在正确下载。
还值得一提的是,我已经尝试使用 Apache commons-net 库(与 Spark 中的版本相同 - 2.2)并且它有效 - 我能够流式传输数据(来自主节点和工作节点).虽然我无法解压缩它(通过使用 Java 的 GZipInputStream;我不记得失败了,但如果你认为这很重要,我可以尝试重现它)。我认为这表明它不是集群上的某些防火墙问题,尽管我无法使用 curl
下载文件。
我想我是 运行 几个月前在我的本地机器上使用相同的代码,如果我没记错的话它工作得很好。
你知道是什么导致了这个问题吗?
会不会是某种依赖冲突问题,如果是的话是哪一个?
我在项目中有几个依赖项,例如 google-sdk、solrj,...但是,我希望看到 ClassNotFoundException
或 NoSuchMethodError
如果是依赖问题。
整个堆栈跟踪如下所示:
16/12/05 23:53:46 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:47 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
16/12/05 23:53:49 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/0/
16/12/05 23:53:50 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/0/ - removing from cache
16/12/05 23:53:50 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:51 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:298)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:495)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:537)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:586)
at org.apache.commons.net.ftp.FTP.quit(FTP.java:794)
at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:788)
at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:151)
at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:395)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1219)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:1064)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply$mcV$sp(RDD.scala:1459)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1438)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1438)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438)
看起来这可能是 Spark/Hadoop 中一个已知的未解决问题:https://issues.apache.org/jira/browse/HADOOP-11886 and https://github.com/databricks/learning-spark/issues/21 都提到了相似的堆栈跟踪。
如果您能够手动使用 Apache commons-net 库,您可以获得与 sc.textFile
相同的效果,方法是获取文件列表,将该文件列表并行化为 RDD,然后使用flatMap
其中每个任务采用一个文件名并逐行读取文件,为每个文件生成行的输出集合。
或者,如果您在 FTP 中的数据量很小(可能最多 10 GB 左右),那么与从您的 FTP 服务器到 Dataproc 集群中的 HDFS 或 GCS,然后在 Spark 作业中使用 HDFS 或 GCS 路径进行处理。
我是 运行 Google Dataproc 上的 Spark 集群,我在尝试使用 sparkContext.textFile(...)
从 FTP 读取 GZip 文件时遇到了一些问题。
我是运行的代码是:
object SparkFtpTest extends App {
val file = "ftp://username:password@host:21/filename.txt.gz"
val lines = sc.textFile(file)
lines.saveAsTextFile("gs://my-bucket-storage/tmp123")
}
我得到的错误是:
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
我看到有人提示凭据有误,所以我尝试输入了错误的凭据,但错误是不同的,即无效的登录凭据。
如果我将 URL 复制到浏览器中,它也有效 - 文件正在正确下载。
还值得一提的是,我已经尝试使用 Apache commons-net 库(与 Spark 中的版本相同 - 2.2)并且它有效 - 我能够流式传输数据(来自主节点和工作节点).虽然我无法解压缩它(通过使用 Java 的 GZipInputStream;我不记得失败了,但如果你认为这很重要,我可以尝试重现它)。我认为这表明它不是集群上的某些防火墙问题,尽管我无法使用 curl
下载文件。
我想我是 运行 几个月前在我的本地机器上使用相同的代码,如果我没记错的话它工作得很好。
你知道是什么导致了这个问题吗? 会不会是某种依赖冲突问题,如果是的话是哪一个?
我在项目中有几个依赖项,例如 google-sdk、solrj,...但是,我希望看到 ClassNotFoundException
或 NoSuchMethodError
如果是依赖问题。
整个堆栈跟踪如下所示:
16/12/05 23:53:46 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:47 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
16/12/05 23:53:49 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/0/
16/12/05 23:53:50 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/0/ - removing from cache
16/12/05 23:53:50 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:51 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:298)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:495)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:537)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:586)
at org.apache.commons.net.ftp.FTP.quit(FTP.java:794)
at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:788)
at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:151)
at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:395)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1219)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:1064)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:956)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply$mcV$sp(RDD.scala:1459)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1438)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1438)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438)
看起来这可能是 Spark/Hadoop 中一个已知的未解决问题:https://issues.apache.org/jira/browse/HADOOP-11886 and https://github.com/databricks/learning-spark/issues/21 都提到了相似的堆栈跟踪。
如果您能够手动使用 Apache commons-net 库,您可以获得与 sc.textFile
相同的效果,方法是获取文件列表,将该文件列表并行化为 RDD,然后使用flatMap
其中每个任务采用一个文件名并逐行读取文件,为每个文件生成行的输出集合。
或者,如果您在 FTP 中的数据量很小(可能最多 10 GB 左右),那么与从您的 FTP 服务器到 Dataproc 集群中的 HDFS 或 GCS,然后在 Spark 作业中使用 HDFS 或 GCS 路径进行处理。