当未找到成对的较早匹配事件时,Flink 发出事件
Flink emit event when earlier matching event in pair not found
我有两个事件流:一个发出事件以表示项目生命周期的开始,另一个流发出事件以表示项目生命周期的结束。 (可以在 itemId
上加入流。)
我如何在 Flink 中为每个 itemId1
发出一个新事件 只有 有一个 "end of lifetime" 事件,而不是相应的开始? (这些开始和结束的事件可能相隔数小时或数天。)
您可以在 KeyedStream
上使用有状态 FlatMapFunction
实现功能。
下面的代码片段应该可以满足您的需求。
val stream1: DataStream[Event1] = ???
val stream2: DataStream[Event2] = ???
// map both streams to their ID and a isStart flag to have a common type
val ids1: DataStream[(Int, Boolean)] = stream1.map(e => (e.id, true) )
val ids2: DataStream[(Int, Boolean)] = stream2.map(e => (e.id, false) )
// union both streams
val ids = ids1.union(ids2)
// use a stateful FlatMapFunction to check
val onlyEOL: DataStream[Int] = ids
// organize stream by ID
.keyBy(_._1)
// use stateful FlatMapFunction to check that bol arrived before eol
.flatMapWithState {
(value: (Int, Boolean), state: Option[Boolean]) =>
if (value._2) {
// bol event -> emit nothing and set state to true
( List(), Some(true))
} else {
// eol event
if (state.isDefined && state.get) {
// bol was seen before -> emit nothing and remove state
( List(), None)
} else {
// bol was NOT seen before -> emit ID and remove state
( List(value._1), None)
}
}
}
我有两个事件流:一个发出事件以表示项目生命周期的开始,另一个流发出事件以表示项目生命周期的结束。 (可以在 itemId
上加入流。)
我如何在 Flink 中为每个 itemId1
发出一个新事件 只有 有一个 "end of lifetime" 事件,而不是相应的开始? (这些开始和结束的事件可能相隔数小时或数天。)
您可以在 KeyedStream
上使用有状态 FlatMapFunction
实现功能。
下面的代码片段应该可以满足您的需求。
val stream1: DataStream[Event1] = ???
val stream2: DataStream[Event2] = ???
// map both streams to their ID and a isStart flag to have a common type
val ids1: DataStream[(Int, Boolean)] = stream1.map(e => (e.id, true) )
val ids2: DataStream[(Int, Boolean)] = stream2.map(e => (e.id, false) )
// union both streams
val ids = ids1.union(ids2)
// use a stateful FlatMapFunction to check
val onlyEOL: DataStream[Int] = ids
// organize stream by ID
.keyBy(_._1)
// use stateful FlatMapFunction to check that bol arrived before eol
.flatMapWithState {
(value: (Int, Boolean), state: Option[Boolean]) =>
if (value._2) {
// bol event -> emit nothing and set state to true
( List(), Some(true))
} else {
// eol event
if (state.isDefined && state.get) {
// bol was seen before -> emit nothing and remove state
( List(), None)
} else {
// bol was NOT seen before -> emit ID and remove state
( List(value._1), None)
}
}
}