一旦写入最终完成,如何处理 HDFS 目录中的新文件?
How to process new files in HDFS directory once their writing has eventually finished?
在我的场景中,我有连续上传到 HDFS 的 CSV 文件。
一旦上传了新文件,我想用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为 parquet
).即我在每个输入文件和 transformed/processed 输出文件之间有一个一对一的映射。
我正在评估 Spark Streaming 以侦听 HDFS 目录,然后使用 Spark 处理 "streamed file"。
但是,为了处理整个文件,我需要知道 "file stream" 何时完成。我想将转换应用于整个文件,以保留文件之间端到端的一对一映射。
如何转换整个文件而不是其微批次?
据我所知,Spark Streaming 只能对批处理(DStreams
映射到 RDDs
)应用转换,而不能一次对整个文件应用转换(当其有限流完成时)。
对吗?如果是这样,我应该为我的场景考虑什么替代方案?
您可以使用 DFSInotifyEventInputStream 查看 Hadoop 目录,然后在创建文件时以编程方式执行 Spark 作业。
看到这个post:
第一次尝试时我可能误解了你的问题...
As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).
Is that correct?
没有。那是不正确的。
Spark Streaming 将在 Spark Streaming 的批处理间隔结束时立即对整个文件应用转换,就像写入 HDFS 一样。
Spark Streaming 将获取文件的当前内容并开始处理它。
As soon as a new file gets uploaded I need to process the new file with Spark/SparkSQL
Spark 几乎 不可能,因为它的架构从“上传”到 Spark 处理它需要一些时间。
您应该考虑使用全新的闪亮 Structured Streaming or (soon obsolete) Spark Streaming。
两种解决方案都支持监视目录中的新文件并在上传新文件后触发 Spark 作业(这正是您的用例)。
引用结构化流的 Input Sources:
In Spark 2.0, there are a few built-in sources.
- File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
另见 Spark Streaming 的 Basic Sources:
Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.
File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported).
请注意您的要求:
I would need to know when the "file stream" completes.
不要用 Spark 这样做。
再次引用 Spark Streaming 的 Basic Sources:
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
总结...您应该仅 将文件移动到 Spark 在文件完成并准备好使用 Spark 进行处理时监视的目录。这超出了 Spark 的范围。
在我的场景中,我有连续上传到 HDFS 的 CSV 文件。
一旦上传了新文件,我想用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为 parquet
).即我在每个输入文件和 transformed/processed 输出文件之间有一个一对一的映射。
我正在评估 Spark Streaming 以侦听 HDFS 目录,然后使用 Spark 处理 "streamed file"。
但是,为了处理整个文件,我需要知道 "file stream" 何时完成。我想将转换应用于整个文件,以保留文件之间端到端的一对一映射。
如何转换整个文件而不是其微批次?
据我所知,Spark Streaming 只能对批处理(DStreams
映射到 RDDs
)应用转换,而不能一次对整个文件应用转换(当其有限流完成时)。
对吗?如果是这样,我应该为我的场景考虑什么替代方案?
您可以使用 DFSInotifyEventInputStream 查看 Hadoop 目录,然后在创建文件时以编程方式执行 Spark 作业。
看到这个post:
第一次尝试时我可能误解了你的问题...
As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).
Is that correct?
没有。那是不正确的。
Spark Streaming 将在 Spark Streaming 的批处理间隔结束时立即对整个文件应用转换,就像写入 HDFS 一样。
Spark Streaming 将获取文件的当前内容并开始处理它。
As soon as a new file gets uploaded I need to process the new file with Spark/SparkSQL
Spark 几乎 不可能,因为它的架构从“上传”到 Spark 处理它需要一些时间。
您应该考虑使用全新的闪亮 Structured Streaming or (soon obsolete) Spark Streaming。
两种解决方案都支持监视目录中的新文件并在上传新文件后触发 Spark 作业(这正是您的用例)。
引用结构化流的 Input Sources:
In Spark 2.0, there are a few built-in sources.
- File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
另见 Spark Streaming 的 Basic Sources:
Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.
File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported).
请注意您的要求:
I would need to know when the "file stream" completes.
不要用 Spark 这样做。
再次引用 Spark Streaming 的 Basic Sources:
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
总结...您应该仅 将文件移动到 Spark 在文件完成并准备好使用 Spark 进行处理时监视的目录。这超出了 Spark 的范围。