如何在 Spark 中设置 FTP 被动模式?...从 FTP 服务器读取文件
How do I set FTP passive mode in Spark?... to read a file from FTP Server
我正在将 FTP server
中的文件读入 spark
rdd,就像这样
val rdd = spark.sparkContext.textFile("ftp://anonymous:pwd@<hostname>/data.gz")
rdd.count
...
当我 运行 来自本地 Machine (Mac) 的 spark 应用程序时,这实际上有效,但是当我尝试 运行 来自docker container(运行ning in Mac),我收到以下异常,
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:313)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:290)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:479)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:552)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:601)
at org.apache.commons.net.ftp.FTP.quit(FTP.java:809)
at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:979)
at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:168)
at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:415)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:626)
at org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:99)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:300)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2098)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2072)
at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1794)
at org.apache.spark.rdd.DefaultPartitionCoalescer.currPrefLocs(CoalescedRDD.scala:180)
at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.$anonfun$getAllPrefLocs(CoalescedRDD.scala:198)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.getAllPrefLocs(CoalescedRDD.scala:197)
at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.<init>(CoalescedRDD.scala:190)
at org.apache.spark.rdd.DefaultPartitionCoalescer.coalesce(CoalescedRDD.scala:391)
at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:90)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
at com.mypackage.Myapp$.parseData(Myapp.scala:76)
在容器中,甚至 ftp
命令行实用程序也有同样的问题,但是通过在 ftp
CLI 中设置 passive
模式发现,我能够成功将文件从 FTP 服务器传输到容器,
ftp <host>
...
ftp> passive
Passive mode on.
ftp> get data.gz
227 Entering Passive Mode ...
226 Transfer complete
20676672 bytes received in 25.53 secs (790.9552 kB/s)
所以我的问题是...如何设置 passive mode
属性?...使用 param.spark.sparkContext.textFile("ftp://anonymous:pwd@<hostname>/data.gz")
在 Spark 中读取文件时
我没有使用 Spark 的经验,所以我不知道它如何与 Hadoop 结合。但在 Hadoop 中,您可以通过设置 fs.ftp.data.connection.mode
configuration option:
来设置 FTP 被动模式
fs.ftp.data.connection.mode=PASSIVE_LOCAL_DATA_CONNECTION_MODE
您至少需要 Hadoop 2.9:https://issues.apache.org/jira/browse/HADOOP-13953
我正在将 FTP server
中的文件读入 spark
rdd,就像这样
val rdd = spark.sparkContext.textFile("ftp://anonymous:pwd@<hostname>/data.gz")
rdd.count
...
当我 运行 来自本地 Machine (Mac) 的 spark 应用程序时,这实际上有效,但是当我尝试 运行 来自docker container(运行ning in Mac),我收到以下异常,
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:313)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:290)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:479)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:552)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:601)
at org.apache.commons.net.ftp.FTP.quit(FTP.java:809)
at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:979)
at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:168)
at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:415)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:626)
at org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:99)
at org.apache.spark.rdd.RDD.$anonfun$preferredLocations(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:300)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2098)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2072)
at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1794)
at org.apache.spark.rdd.DefaultPartitionCoalescer.currPrefLocs(CoalescedRDD.scala:180)
at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.$anonfun$getAllPrefLocs(CoalescedRDD.scala:198)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.getAllPrefLocs(CoalescedRDD.scala:197)
at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.<init>(CoalescedRDD.scala:190)
at org.apache.spark.rdd.DefaultPartitionCoalescer.coalesce(CoalescedRDD.scala:391)
at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:90)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
at com.mypackage.Myapp$.parseData(Myapp.scala:76)
在容器中,甚至 ftp
命令行实用程序也有同样的问题,但是通过在 ftp
CLI 中设置 passive
模式发现,我能够成功将文件从 FTP 服务器传输到容器,
ftp <host>
...
ftp> passive
Passive mode on.
ftp> get data.gz
227 Entering Passive Mode ...
226 Transfer complete
20676672 bytes received in 25.53 secs (790.9552 kB/s)
所以我的问题是...如何设置 passive mode
属性?...使用 param.spark.sparkContext.textFile("ftp://anonymous:pwd@<hostname>/data.gz")
我没有使用 Spark 的经验,所以我不知道它如何与 Hadoop 结合。但在 Hadoop 中,您可以通过设置 fs.ftp.data.connection.mode
configuration option:
fs.ftp.data.connection.mode=PASSIVE_LOCAL_DATA_CONNECTION_MODE
您至少需要 Hadoop 2.9:https://issues.apache.org/jira/browse/HADOOP-13953