在 Flink 的 DataStream 中连续处理 parquet 文件作为数据流 API
Process continuously parquet files as Datastreams in Flink's DataStream API
我在 HDFS 上有一个 parquet 文件。它每天都会被一个新的覆盖。我的目标是连续发出这个 parquet 文件 - 当它发生变化时 - 作为 Flink 作业中使用 DataStream API 的 DataStream。
最终目标是在广播状态下使用文件内容,但这超出了这个问题的范围。
- 要连续处理一个文件,API:Data-sources关于数据源非常有用。更具体地说,FileProcessingMode.PROCESS_CONTINUOUSLY:这正是我所需要的。这适用于 reading/monitoring 文本文件,没问题,但不适用于 parquet 文件:
// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)
- 要处理 parquet 文件,我可以使用 Hadoop 输入格式 使用此 API:
using-hadoop-inputformats。但是 API 没有 FileProcessingMode 参数,并且只处理文件一次:
// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
// process the record here ...
}
我想以某种方式组合这两个 API,通过 DataStream API 连续处理 Parquet 文件。你们有人试过这样的事情吗?
看了Flink的代码,感觉这两个API比较不一样,不太可能合并到一起。
我将在此处详细介绍的另一种方法是定义您自己的 SourceFunction,它将定期读取文件:
class ParquetSourceFunction extends SourceFunction[Int] {
private var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
while (isRunning) {
val path = new Path("path_to_parquet_file")
val conf = new Configuration()
val readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(conf, metadata, path, readFooter.getBlocks, schema.getColumns)
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group = recordReader.read()
val my_integer = group.getInteger("field_name", 0)
ctx.collect(my_integer)
}
}
}
// do whatever logic suits you to stop "watching" the file
Thread.sleep(60000)
}
}
override def cancel(): Unit = isRunning = false
}
然后,使用 streamExecutionEnvironment 注册这个源:
val dataStream: DataStream[Int] = streamExecutionEnvironment.addSource(new ParquetProtoSourceFunction)
// do what you want with your new datastream
我在 HDFS 上有一个 parquet 文件。它每天都会被一个新的覆盖。我的目标是连续发出这个 parquet 文件 - 当它发生变化时 - 作为 Flink 作业中使用 DataStream API 的 DataStream。 最终目标是在广播状态下使用文件内容,但这超出了这个问题的范围。
- 要连续处理一个文件,API:Data-sources关于数据源非常有用。更具体地说,FileProcessingMode.PROCESS_CONTINUOUSLY:这正是我所需要的。这适用于 reading/monitoring 文本文件,没问题,但不适用于 parquet 文件:
// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)
- 要处理 parquet 文件,我可以使用 Hadoop 输入格式 使用此 API: using-hadoop-inputformats。但是 API 没有 FileProcessingMode 参数,并且只处理文件一次:
// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
// process the record here ...
}
我想以某种方式组合这两个 API,通过 DataStream API 连续处理 Parquet 文件。你们有人试过这样的事情吗?
看了Flink的代码,感觉这两个API比较不一样,不太可能合并到一起。
我将在此处详细介绍的另一种方法是定义您自己的 SourceFunction,它将定期读取文件:
class ParquetSourceFunction extends SourceFunction[Int] {
private var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
while (isRunning) {
val path = new Path("path_to_parquet_file")
val conf = new Configuration()
val readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(conf, metadata, path, readFooter.getBlocks, schema.getColumns)
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group = recordReader.read()
val my_integer = group.getInteger("field_name", 0)
ctx.collect(my_integer)
}
}
}
// do whatever logic suits you to stop "watching" the file
Thread.sleep(60000)
}
}
override def cancel(): Unit = isRunning = false
}
然后,使用 streamExecutionEnvironment 注册这个源:
val dataStream: DataStream[Int] = streamExecutionEnvironment.addSource(new ParquetProtoSourceFunction)
// do what you want with your new datastream