有没有办法在 Flink 中按事件时间顺序压缩两个或多个流?

Is there any way to zip two or more streams with order of event time in Flink?

假设我们有一个格式如下的数据流:

输入数据流示例:

case class InputElement(key:String,objectType:String,value:Boolean)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
val inputStream:DataSet[InputElement] = env.fromElements(
    InputElement("k1","t1",true)
    ,InputElement("k2","t1",true)
    ,InputElement("k2","t2",true)
    ,InputElement("k1","t2",false)
    ,InputElement("k1","t2",true)
    ,InputElement("k1","t1",false)
    ,InputElement("k2","t2",false)
)

在语义上等同于拥有这些流:

val inputStream_k1_t1 = env.fromElements(
    InputElement("k1","t1",true),
    InputElement("k1","t1",false)
)
val inputStream_k1_t2 = env.fromElements(
    InputElement("k1","t2",false),
    ,InputElement("k1","t2",true)
)
val inputStream_k2_t1 = env.fromElements(
    InputElement("k2","t1",true)
)
val inputStream_k2_t2 = env.fromElements(
    InputElement("k2","t2",true),
    InputElement("k2","t2",false)
)

我想要这样的输出类型:

case class OutputElement(key:String,values:Map[String,Boolean])

示例输入数据的预期输出数据流:

val expectedOutputStream = env.fromElements(
    OutputElement("k1",Map( "t1"->true ,"t2"->false)),
    OutputElement("k2",Map("t1"->true,"t2"->true)),
    OutputElement("k1",Map("t1"->false,"t2"->true)),
    OutputElement("k2",Map("t2"->false))
)

==========================================

编辑 1:

经过对问题的一些考虑,问题的主题发生了变化:

我想要另一个输入流来显示哪些键订阅了哪些对象类型:

case class SubscribeRule(strategy:String,patterns:Set[String])
val subscribeStream: DataStream[SubscribeRule] = env.fromElements(

      SubscribeRule("s1",Set("p1","p2")),
      SubscribeRule("s2",Set("p1","p2"))    
    )

现在我想要这个输出:

(在接收到所有订阅的对象类型之前,结果流不会发出任何东西:

val expectedOutputStream = env.fromElements(
    OutputElement("k1",Map( "t1"->true ,"t2"->false)),
    OutputElement("k2",Map("t1"->true,"t2"->true)),
    OutputElement("k1",Map("t1"->false,"t2"->true)),
//      OutputElement("k2",Map("t2"->false)) # this element will emit when a k2-t1 input message recieved
)

App.scala:

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}

object App {
  case class updateStateResult(updatedState:Map[String,List[Boolean]],output:Map[String,Boolean])
  case class InputElement(key:String,objectType:String,passed:Boolean)
  case class SubscribeRule(strategy:String,patterns:Set[String])
  case class OutputElement(key:String,result:Map[String,Boolean])
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // checkpoint every 10 seconds
    val subscribeStream: DataStream[SubscribeRule] = env.fromElements(

      SubscribeRule("s1",Set("p1","p2")),
      SubscribeRule("s2",Set("p1","p2"))

    )
    val broadcastStateDescriptor =
      new MapStateDescriptor[String, Set[String]]("subscribes", classOf[String], classOf[Set[String]])
    val subscribeStreamBroadcast: BroadcastStream[SubscribeRule] =
      subscribeStream.broadcast(broadcastStateDescriptor)
    val inputStream = env.fromElements(
              InputElement("s1","p1",true),
              InputElement("s1","p2",true),
              InputElement("s2","p1",false),
              InputElement("s2","p2",true),
              InputElement("s2","p2",false),
              InputElement("s1","p1",false),
              InputElement("s2","p1",true),
              InputElement("s1","p2",true),
    )
    val expected = List(
        OutputElement("s1",Map("p2"->true,"p1"->true)),
        OutputElement("s2",Map("p2"->true,"p1"->false)),
      OutputElement("s2",Map("p2"->false,"p1"->true)),
        OutputElement("s1",Map("p2"->true,"p1"->false))
      )



        val keyedInputStream: KeyedStream[InputElement, String] = inputStream.keyBy(_.key)
        val result = keyedInputStream
          .connect(subscribeStreamBroadcast)
          .process(new ZippingFunc())
    result.print
    env.execute("test stream")
  }
}

ZippingFunc.scala

import App.{InputElement, OutputElement, SubscribeRule, updateStateResult}
import org.apache.flink.api.common.state.{ MapState, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.util.Collector

import java.util.{Map => JavaMap}
import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsJavaMapConverter}

class ZippingFunc extends KeyedBroadcastProcessFunction[String, InputElement,SubscribeRule , OutputElement] {

  private var localState: MapState[String,List[Boolean]] = _


  private lazy val broadcastStateDesc =
    new MapStateDescriptor[String, Set[String]]("subscribes", classOf[String], classOf[Set[String]])
  override def open(parameters: Configuration) {
    val localStateDesc: MapStateDescriptor[String,List[Boolean]] =
      new MapStateDescriptor[String, List[Boolean]]("sourceMap1", classOf[String], classOf[List[Boolean]])
    localState = getRuntimeContext.getMapState(localStateDesc)
  }



  def updateVar(objectType:String,value:Boolean): Option[Map[String, Boolean]] ={
    val values = localState.get(objectType)
    localState.put(objectType, value::values)
    pickOutputs(localState.entries().asScala).map((ur: updateStateResult) => {
      localState.putAll(ur.updatedState.asJava)
      ur.output
    })
  }

  def pickOutputs(entries: Iterable[JavaMap.Entry[String, List[Boolean]]]): Option[updateStateResult] = {
    val mapped: Iterable[Option[(String, Boolean, List[Boolean])]] = entries.map(
      (x: JavaMap.Entry[String, List[Boolean]]) => {
        val key: String = x.getKey
        val value: List[Boolean] = x.getValue
        val head: Option[Boolean] = value.headOption
        head.map(
          h => {
            (key, h, value.tail)
          }
        )
      }
    )
    sequenceOption(mapped).map((x: List[(String, Boolean, List[Boolean])]) => {
      updateStateResult(
        x.map(y => (y._1, y._3)).toMap,
        x.map(y => (y._1, y._2)).toMap
      )
    }
    )
  }
  def sequenceOption[A](l:Iterable[Option[A]]): Option[List[A]] =
  {
    l.foldLeft[Option[List[A]]](Some(List.empty[A]))(
      (acc: Option[List[A]], e: Option[A]) =>{
        for {
          xs <- acc
          x <- e
        } yield x :: xs
      }
    )
  }
  override def processElement(value: InputElement, ctx: KeyedBroadcastProcessFunction[String, InputElement, SubscribeRule, OutputElement]#ReadOnlyContext, out: Collector[OutputElement]): Unit = {
    val bs: ReadOnlyBroadcastState[String, Set[String]] = ctx.getBroadcastState(broadcastStateDesc)
    if(bs.contains(value.key)) {
      val allPatterns: Set[String] = bs.get(value.key)
      allPatterns.map((patternName: String) =>
        if (!localState.contains(patternName))
          localState.put(patternName, List.empty)
      )
      updateVar(value.objectType, value.passed)
        .map((r: Map[String, Boolean]) =>
        out.collect(OutputElement(value.key, r))
      )
    }
  }
  //    )

override def processBroadcastElement(value: SubscribeRule, ctx: KeyedBroadcastProcessFunction[String, InputElement, SubscribeRule, OutputElement]#Context, out: Collector[OutputElement]): Unit = {
  val bs = ctx.getBroadcastState(broadcastStateDesc)
  bs.put(value.strategy,value.patterns)
}
}