如何聚合window session flink中的元素?
How to aggregate the elements in the window sessions flink?
我正在使用 flink Session windows 当它在一段时间内没有接收到元素时,即;当出现不活动间隙时,它应该发出一个事件。
我在flink作业中配置了10秒的间隔。我发送了事件 1,并在 5 秒后发送了事件 2。这两个事件应该属于第一个window。输出应该是这两个事件的集合。但我只收到第一个事件。
下面是我试过的代码:
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
然后 MyProcessWindowFunction 看起来像
class MyProcessWindowFunction : ProcessWindowFunction<Tuple4<String, inputA?, inputB?, inputC?>, Tuple2<String, SampleOutput?>,
String, TimeWindow>() {
private lateinit var sampleOutputState: ValueState<SampleOutputState>
override fun open(parameters: Configuration) {
val SampleOutputStateDescriptor = ValueStateDescriptor("sample-output-state", SampleOutputState::class.java)
SampleOutputState = runtimeContext.getState(SampleOutputStateDescriptor)
}
override fun process(key: String, context: Context, elements: MutableIterable<Tuple4<String, inputA?, inputB?, inputC?>, out: Collector<Tuple2<String, SampleOutput?>>) {
val current = sampleOutputState.value()
val value = elements.iterator().next()
val latestState = when {
value.f2 != null -> processCondition(value.f2!!, current)
else -> return
}
sampleOutputState.update(latestState)
out.collect(Tuple2(key, latestState))
}
private fun processInputB(inputB: InputB, currentState: SampleOutputState?): SampleOutputState {
return currentState?.copy(
timestamp = System.currentTimeMillis(),
eventTime = condition.eventTime,
) ?:
createInputBState(inputB)
}
private fun createInputBState(inputB: InputB): SampleOutputState = SampleOutputState(
id = UUID.randomUUID().toString(),
timestamp = System.currentTimeMillis(),
eventTime = condition.eventTime,
)
}
我得到了唯一的 event1,但我想得到这两个事件的总和(我发送了 event1 和 event2)。
我们如何获取会话中可用事件的聚合?
分配给 window 的所有事件都将在可迭代中发送到 ProcessWindowFunction
的 process
方法。您目前只查看带有
的第一个元素
val value = elements.iterator().next()
您需要遍历 elements
以生成聚合结果。
我正在使用 flink Session windows 当它在一段时间内没有接收到元素时,即;当出现不活动间隙时,它应该发出一个事件。
我在flink作业中配置了10秒的间隔。我发送了事件 1,并在 5 秒后发送了事件 2。这两个事件应该属于第一个window。输出应该是这两个事件的集合。但我只收到第一个事件。
下面是我试过的代码:
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
然后 MyProcessWindowFunction 看起来像
class MyProcessWindowFunction : ProcessWindowFunction<Tuple4<String, inputA?, inputB?, inputC?>, Tuple2<String, SampleOutput?>,
String, TimeWindow>() {
private lateinit var sampleOutputState: ValueState<SampleOutputState>
override fun open(parameters: Configuration) {
val SampleOutputStateDescriptor = ValueStateDescriptor("sample-output-state", SampleOutputState::class.java)
SampleOutputState = runtimeContext.getState(SampleOutputStateDescriptor)
}
override fun process(key: String, context: Context, elements: MutableIterable<Tuple4<String, inputA?, inputB?, inputC?>, out: Collector<Tuple2<String, SampleOutput?>>) {
val current = sampleOutputState.value()
val value = elements.iterator().next()
val latestState = when {
value.f2 != null -> processCondition(value.f2!!, current)
else -> return
}
sampleOutputState.update(latestState)
out.collect(Tuple2(key, latestState))
}
private fun processInputB(inputB: InputB, currentState: SampleOutputState?): SampleOutputState {
return currentState?.copy(
timestamp = System.currentTimeMillis(),
eventTime = condition.eventTime,
) ?:
createInputBState(inputB)
}
private fun createInputBState(inputB: InputB): SampleOutputState = SampleOutputState(
id = UUID.randomUUID().toString(),
timestamp = System.currentTimeMillis(),
eventTime = condition.eventTime,
)
}
我得到了唯一的 event1,但我想得到这两个事件的总和(我发送了 event1 和 event2)。
我们如何获取会话中可用事件的聚合?
分配给 window 的所有事件都将在可迭代中发送到 ProcessWindowFunction
的 process
方法。您目前只查看带有
val value = elements.iterator().next()
您需要遍历 elements
以生成聚合结果。