如何为单个 flink 作业配置设置两个不同的处理函数?
How to setup two different process functions for single flink job config?
我有一个带有 3 个不同输入(可选)的 flink 作业,每种类型的输入都会发出相同的输出。
输入 1 使用 KeyedProcessFunction()
input2 使用 ProcessWindowFuction()
基本上,作业输入是三个输入和一个输出的联合。
我们如何配置 flink 作业,以便对于单个作业,我可以使用上面的两个过程函数。
我想对 input1
使用 KeyedProcessFunction()
,对 input2
使用 ProcessWindowFuction()
。
下面的代码仅适用于 input2
,如果我想使用 input1
我必须在作业配置中使用 .process(processFuction())
而不是 .process(MyProcessWindowFunction())
,我们如何配置以便我可以在单个作业中同时使用这两个功能?
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
一个 Flink 作业可以包含多个管道。例如,
env.fromSource(input1)
.keyBy(...)
.process(new MyKeyedProcessFunction())
.sinkTo(sink1)
env.fromSource(input2)
.keyBy(...)
.window(...)
.process(new MyProcessWindowFunction())
.sinkTo(sink2)
env.execute()
我有一个带有 3 个不同输入(可选)的 flink 作业,每种类型的输入都会发出相同的输出。
输入 1 使用 KeyedProcessFunction()
input2 使用 ProcessWindowFuction()
基本上,作业输入是三个输入和一个输出的联合。 我们如何配置 flink 作业,以便对于单个作业,我可以使用上面的两个过程函数。
我想对 input1
使用 KeyedProcessFunction()
,对 input2
使用 ProcessWindowFuction()
。
下面的代码仅适用于 input2
,如果我想使用 input1
我必须在作业配置中使用 .process(processFuction())
而不是 .process(MyProcessWindowFunction())
,我们如何配置以便我可以在单个作业中同时使用这两个功能?
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
一个 Flink 作业可以包含多个管道。例如,
env.fromSource(input1)
.keyBy(...)
.process(new MyKeyedProcessFunction())
.sinkTo(sink1)
env.fromSource(input2)
.keyBy(...)
.window(...)
.process(new MyProcessWindowFunction())
.sinkTo(sink2)
env.execute()