Flink SlidingEventTimeWindows 没有按预期工作

Flink SlidingEventTimeWindows doesnt work as expected

我将流执行配置为

object FlinkSlidingEventTimeExample extends App {
    case class Trx(timestamp:Long, id:String, trx:String, count:Int)
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    val watermarkS1 = WatermarkStrategy
      .forBoundedOutOfOrderness[Trx](Duration.ofSeconds(15))
      .withTimestampAssigner(new SerializableTimestampAssigner[Trx] {
          override def extractTimestamp(element: Trx, recordTimestamp: Long): Long = element.timestamp
      })
    val s1 = env.socketTextStream("localhost", 9999)
      .flatMap(l => l.split(" "))
      .map(l => Trx(timestamp = l.split(",")(0).toLong, id = l.split(",")(1), trx = l.split(",")(2), count = 1))
      .assignTimestampsAndWatermarks(watermarkS1)
      .keyBy(l => l.id)
      .window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5))) // Not working
      //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))) // Working
      .sum("count")
      .print
    env.execute("FlinkSlidingEventTimeExample")
}

我已经定义了一个水印,但不明白为什么它没有产生任何东西。有人有什么想法吗?我的flink版本是1.14.0

我的 build.sbt 如下所示:

scalaVersion := "2.12.15"



libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0"
libraryDependencies += "org.apache.flink" %% "flink-runtime-web" % "1.14.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0"
libraryDependencies += "org.apache.flink" % "flink-queryable-state-runtime" % "1.14.0"

我正在从套接字(端口:9999)输入输入数据,如下所示:

1640375790000,1,trx1
1640375815000,1,trx2
1640375841000,1,trx3
1640375741000,1,trx4

试图提供比 window 大小更大的时间戳,但仍然无效。

Flink Web UI 截图: web-ui watermarks

之前的回答已删除;它基于对设置的错误假设。

当事件时间 windows 无法产生结果时,它总是与水印有关。

您输入的时间戳对应

December 24, 2021 19:56:30
December 24, 2021 19:56:55
December 24, 2021 19:57:21
December 24, 2021 19:55:41

所以有足够的数据来触发几个滑动的关闭windows。例如,trx2 有一个足够大的时间戳,它可以生成一个足够大的水印来关闭这些包含 19:56:30:

的 windows
19:56:15 - 19:56:34.999
19:56:20 - 19:56:39.999

但是,您的执行图看起来像这样:

问题是套接字源和后续任务(执行平面图 -> 地图 -> 水印的任务)之间的重新平衡。您的四个事件中的每一个都将发送到水印策略的不同实例,并且一些实例没有接收到任何事件。这就是为什么没有生成水印的原因。

你想要做的是将输入解析和水印生成链接到源以相同的并行度,这样你的执行图看起来像这样:

此代码将执行此操作:

env
  .socketTextStream("localhost", 9999)
  .map(l => {
    val input = l.split(",")
    Trx(timestamp = input(0).toLong, id = input(1), trx = input(2), count = 1)
  })
  .setParallelism(1)
  .assignTimestampsAndWatermarks(watermarkS1)
  .setParallelism(1)
  .keyBy(l => l.id)
  .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
  .sum("count")
  .print

一般来说,没有必要以 1 的并行度进行水印,但水印生成器的每个实例都必须有足够的事件来处理,或者配置为 withIdleness。 (如果每个实例都空闲,那么您也不会得到任何结果。)