全局 Window 在 Apache Flink 转换后触发

Global Window Trigger after a transformation in Apache Flink

我正在尝试在 flink 中实现 window 触发器,如果​​平均值高于阈值,它将触发。

流式数据的学生姓名和分数由 , 分隔。如果学生的平均分数超过 90,则必须触发 window,无论尝试次数如何。

示例数据:

Fred,88
Fred,91
Wilma,93
.
.

当前Flink代码:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger, Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window}

case class Marks(name : String, mark : Double, count : Int)

class MarksTrigger[W <: Window] extends Trigger[Marks,W] {

  override def onElement(element: Marks, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if(element.mark > 90) TriggerResult.FIRE  // fire if avg mark is > 90
    else TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def clear(window: W, ctx: TriggerContext) = ???
}

object Main {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.socketTextStream("localhost", 9999)

    val fdata = data.map { values =>
      val columns = values.split(",")
      Marks(columns(0), columns(1).toDouble, 1)
    }

    val keyed = fdata.keyBy(_.name).
      window(GlobalWindows.create()).
      trigger(new MarksTrigger[GlobalWindow]()). // TODO



    keyed.print()
    env.execute()
  }
}

正在计算平均值: 在批处理模式下尝试了以下

case class Marks(name : String, mark : Double, count : Int)

val data = benv.fromElements(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))

data.map(x => (x._1, x._2, 1)).groupBy(0).reduce { (x, y) => 
    (x._1, x._2 + y._2, x._3 + y._3) 
}.map(x => Marks(x._1, x._2/x._3, x._3)).collect

如何将它们联系在一起?应该在计算平均值之前调用 .window().trigger() 还是应该在 onElement() 内完成平均值计算?

我想出了解决办法

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger, Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window}


class MarksTrigger[W <: Window] extends Trigger[Marks,W] {

  override def onElement(element: Marks, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    //trigger is fired if average marks of a student cross 80
    if(element.mark > 90) TriggerResult.FIRE
    else TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def clear(window: W, ctx: TriggerContext) = ???
}

case class Marks(name : String, mark : Double, count : Int)

object Main {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.socketTextStream("localhost", 9999)

    // data is obtained in "name,mark" format
    val fdata = data.map { values =>
      val columns = values.split(",")
      (columns(0), columns(1).toDouble, 1)
    }

    // calculating average mark and number of exam attempts
    val keyed1 = fdata.keyBy(0).reduce { (x,y) =>
      (x._1, x._2 + y._2, x._3 + y._3)
    }.map( x => Marks(x._1, x._2 / x._3, x._3))


    val keyed = keyed1.keyBy(_.name).
      window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(new MarksTrigger[GlobalWindow]())).
      maxBy(1)

    keyed.print()
    env.execute()

  }
}