无法在 Google Dataproc 上使用 SparkContext.textFile(...) 通过 FTP 读取文件

Can't read files via FTP using SparkContext.textFile(...) on Google Dataproc

我是 运行 Google Dataproc 上的 Spark 集群,我在尝试使用 sparkContext.textFile(...) 从 FTP 读取 GZip 文件时遇到了一些问题。

我是运行的代码是:

object SparkFtpTest extends App {
  val file = "ftp://username:password@host:21/filename.txt.gz"
  val lines = sc.textFile(file)
  lines.saveAsTextFile("gs://my-bucket-storage/tmp123")
}

我得到的错误是:

Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.

我看到有人提示凭据有误,所以我尝试输入了错误的凭据,但错误是不同的,即无效的登录凭据。

如果我将 URL 复制到浏览器中,它也有效 - 文件正在正确下载。

还值得一提的是,我已经尝试使用 Apache commons-net 库(与 Spark 中的版本相同 - 2.2)并且它有效 - 我能够流式传输数据(来自主节点和工作节点).虽然我无法解压缩它(通过使用 Java 的 GZipInputStream;我不记得失败了,但如果你认为这很重要,我可以尝试重现它)。我认为这表明它不是集群上的某些防火墙问题,尽管我无法使用 curl 下载文件。

我想我是 运行 几个月前在我的本地机器上使用相同的代码,如果我没记错的话它工作得很好。

你知道是什么导致了这个问题吗? 会不会是某种依赖冲突问题,如果是的话是哪一个?

我在项目中有几个依赖项,例如 google-sdk、solrj,...但是,我希望看到 ClassNotFoundExceptionNoSuchMethodError如果是依赖问题。

整个堆栈跟踪如下所示:

16/12/05 23:53:46 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:47 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
16/12/05 23:53:49 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/0/
16/12/05 23:53:50 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/0/ - removing from cache
16/12/05 23:53:50 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/
16/12/05 23:53:51 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:298)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:495)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:537)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:586)
    at org.apache.commons.net.ftp.FTP.quit(FTP.java:794)
    at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:788)
    at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:151)
    at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:395)
    at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply$mcV$sp(PairRDDFunctions.scala:1219)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1161)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset.apply(PairRDDFunctions.scala:1161)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:1064)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1030)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply$mcV$sp(PairRDDFunctions.scala:956)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:956)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile.apply(PairRDDFunctions.scala:956)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply$mcV$sp(RDD.scala:1459)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1438)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile.apply(RDD.scala:1438)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438)

看起来这可能是 Spark/Hadoop 中一个已知的未解决问题:https://issues.apache.org/jira/browse/HADOOP-11886 and https://github.com/databricks/learning-spark/issues/21 都提到了相似的堆栈跟踪。

如果您能够手动使用 Apache commons-net 库,您可以获得与 sc.textFile 相同的效果,方法是获取文件列表,将该文件列表并行化为 RDD,然后使用flatMap 其中每个任务采用一个文件名并逐行读取文件,为每个文件生成行的输出集合。

或者,如果您在 FTP 中的数据量很小(可能最多 10 GB 左右),那么与从您的 FTP 服务器到 Dataproc 集群中的 HDFS 或 GCS,然后在 Spark 作业中使用 HDFS 或 GCS 路径进行处理。