如何为单个 flink 作业配置设置两个不同的处理函数?

How to setup two different process functions for single flink job config?

我有一个带有 3 个不同输入(可选)的 flink 作业,每种类型的输入都会发出相同的输出。

输入 1 使用 KeyedProcessFunction() input2 使用 ProcessWindowFuction()

基本上,作业输入是三个输入和一个输出的联合。 我们如何配置 flink 作业,以便对于单个作业,我可以使用上面的两个过程函数。

我想对 input1 使用 KeyedProcessFunction(),对 input2 使用 ProcessWindowFuction()

下面的代码仅适用于 input2,如果我想使用 input1 我必须在作业配置中使用 .process(processFuction()) 而不是 .process(MyProcessWindowFunction()),我们如何配置以便我可以在单个作业中同时使用这两个功能?

    fun setupJob(env: StreamExecutionEnvironment) {

        val testStream = env.sampleStream()
                   .keyBy { it.f0 }
                   .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                   .process(MyProcessWindowFunction())
    
            testStream.map { it.toKafkaMessage() }
                    .kafkaSink<SampleOutput>() }
}

一个 Flink 作业可以包含多个管道。例如,

env.fromSource(input1)
  .keyBy(...)
  .process(new MyKeyedProcessFunction())
  .sinkTo(sink1)

env.fromSource(input2)
  .keyBy(...)
  .window(...)
  .process(new MyProcessWindowFunction())
  .sinkTo(sink2)

env.execute()