如何使用 flink 打印文件中的总行数

How to print the total number of lines in files using flink

我正在从 parquet 读取行,因为我正在使用类似于 的源函数,但是当我尝试计算正在处理的行数时,尽管作业已完成,但没有打印任何内容:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
lazy val stream: DataStream[Group] = env.addSource(new ParquetSourceFunction)
stream.map(_ => 1)                    
    .timeWindowAll(Time.seconds(180)) 
    .reduce( _ + _).print() 

问题是你正在使用 ProcessingTime,所以基本上每当你使用 EventTime 文件完成时,Flink 都会发出一个带有 Long.Max 值的水印,所以所有 windows 都已关闭,但是在使用 ProcessingTime 时不会发生这种情况,所以简单地说 Flink 不会等待您的 window 关闭,这就是为什么您没有得到任何有价值的东西结果。

您可能想尝试切换到 DataSet API,这应该更适合您要完成的任务。

或者,您可以尝试使用 EventTime 并分配静态 Watermark,因为 Flink 最后仍会发出具有 Long.Max 值的水印。