如何使用 flink 打印文件中的总行数
How to print the total number of lines in files using flink
我正在从 parquet 读取行,因为我正在使用类似于 的源函数,但是当我尝试计算正在处理的行数时,尽管作业已完成,但没有打印任何内容:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
lazy val stream: DataStream[Group] = env.addSource(new ParquetSourceFunction)
stream.map(_ => 1)
.timeWindowAll(Time.seconds(180))
.reduce( _ + _).print()
问题是你正在使用 ProcessingTime
,所以基本上每当你使用 EventTime
文件完成时,Flink 都会发出一个带有 Long.Max
值的水印,所以所有 windows 都已关闭,但是在使用 ProcessingTime
时不会发生这种情况,所以简单地说 Flink 不会等待您的 window 关闭,这就是为什么您没有得到任何有价值的东西结果。
您可能想尝试切换到 DataSet
API,这应该更适合您要完成的任务。
或者,您可以尝试使用 EventTime
并分配静态 Watermark,因为 Flink 最后仍会发出具有 Long.Max
值的水印。
我正在从 parquet 读取行,因为我正在使用类似于
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
lazy val stream: DataStream[Group] = env.addSource(new ParquetSourceFunction)
stream.map(_ => 1)
.timeWindowAll(Time.seconds(180))
.reduce( _ + _).print()
问题是你正在使用 ProcessingTime
,所以基本上每当你使用 EventTime
文件完成时,Flink 都会发出一个带有 Long.Max
值的水印,所以所有 windows 都已关闭,但是在使用 ProcessingTime
时不会发生这种情况,所以简单地说 Flink 不会等待您的 window 关闭,这就是为什么您没有得到任何有价值的东西结果。
您可能想尝试切换到 DataSet
API,这应该更适合您要完成的任务。
或者,您可以尝试使用 EventTime
并分配静态 Watermark,因为 Flink 最后仍会发出具有 Long.Max
值的水印。