获取先前的 window 值以处理延迟事件
Getting previous window value for processing late events
我正在寻找一种方法来设置窗口以允许迟到,并让我根据之前为会话计算的值计算值。
我的会话值总体上是一个唯一标识符,应该永远不会发生冲突,但从技术上讲,会话可以随时进入。在大多数会话中,大多数事件的处理时间超过 5 分钟,允许迟到 1 天应该可以满足任何迟到的事件。
stream
.keyBy { jsonEvent => jsonEvent.findValue("session").toString }
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5)))
.allowedLateness(Time.days(1))
.process { new SessionProcessor }
.addSink { new HttpSink }
对于每个会话,我都会找到一个字段的最大值,并检查一些事件是否没有发生(如果确实发生了,它们将使最大值字段为零)。我决定创建一个 ProcessWindowFunction
来执行此操作。
Class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
//Parse and calculate data
maxValue = if(badEvent1) 0 else maxValue
maxValue = if(badEvent2) 0 else maxValue
out.collect((string1,string2,string3, maxValue))
}
}
这在允许延迟事件之前工作正常。当延迟事件通过时,maxValue
被重新计算并再次输出到 HttpSink
。我正在寻找一种方法,以便我可以计算之前 maxValue
和后期 maxValue
的增量。
我正在寻找一种方法来确定:
- 如果对函数的调用来自延迟事件(我不想重复计算会话总数)
- 新的数据是什么,或者有没有办法存储之前的计算值。
如有任何帮助,我们将不胜感激。
编辑:用于 ValueState 的新代码
KafkaConsumer.scala
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema
import org.apache.flink.streaming.api.scala._
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaConsumer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val properties = getServerProperties
val consumer = new FlinkKafkaConsumer010[ObjectNode]("test-topic", new JSONDeserializationSchema, properties)
consumer.setStartFromLatest()
val stream = env.addSource(consumer)
stream
.keyBy { jsonEvent => jsonEvent.findValue("data").findValue("query").findValue("session").toString }
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.days(1))
.process {
new SessionProcessor
}
.print
env.execute("Kafka APN Consumer")
}
}
SessionProcessor.scala
import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {
final val previousValue = new ValueStateDescriptor("previousValue", classOf[Long])
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
val previousVal: ValueState[Long] = context.windowState.getState(previousValue)
val pVal: Long = previousVal.value match {
case i: Long => i
}
var session = ""
var user = ""
var department = ""
var lVal: Long = 0
elements.foreach( value => {
var jVal: String = "0"
if (value.findValue("data").findValue("query").has("value")) {
jVal = value.findValue("data").findValue("query").findValue("value").toString replaceAll("\"", "")
}
session = value.findValue("data").findValue("query").findValue("session").toString replaceAll("\"", "")
user = value.findValue("data").findValue("query").findValue("user").toString replaceAll("\"", "")
department = value.findValue("data").findValue("query").findValue("department").toString replaceAll("\"", "")
lVal = if (jVal.toLong > lVal) jVal.toLong else lVal
})
val increaseTime = lVal - pVal
previousVal.update(increaseTime)
out.collect((session, user, department, increaseTime))
}
}
这是一个执行类似操作的示例。希望它是合理的不言自明的,并且应该足够简单以适应您的需求。
这里的基本思想是您可以使用 context.windowState()
,这是通过传递给 ProcessWindowFunction 的上下文提供的每个 window 状态。这个 windowState 实际上只对多次触发的 windows 有用,因为每个新的 window 实例都有一个新初始化的(和空的)windowState 存储。对于在所有 windows 之间共享的状态(但仍然是键控的),使用 context.globalState()
.
private static class DifferentialWindowFunction
extends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> {
private final static ValueStateDescriptor<Long> previousFiringState =
new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE);
private final static ReducingStateDescriptor<Long> firingCounterState =
new ReducingStateDescriptor<>("firing-counter", new Sum(), LongSerializer.INSTANCE);
@Override
public void process(
String key,
Context context,
Iterable<Long> values,
Collector<Tuple2<Long, Long>> out) {
ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);
Long output = Iterables.getOnlyElement(values);
if (firingCounter.get() == null) {
// first firing
out.collect(Tuple2.of(0L, output));
} else {
// subsequent firing
out.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value()));
}
firingCounter.add(1L);
previousFiring.update(output);
}
@Override
public void clear(Context context) {
ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);
previousFiring.clear();
firingCounter.clear();
}
}
我正在寻找一种方法来设置窗口以允许迟到,并让我根据之前为会话计算的值计算值。
我的会话值总体上是一个唯一标识符,应该永远不会发生冲突,但从技术上讲,会话可以随时进入。在大多数会话中,大多数事件的处理时间超过 5 分钟,允许迟到 1 天应该可以满足任何迟到的事件。
stream
.keyBy { jsonEvent => jsonEvent.findValue("session").toString }
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5)))
.allowedLateness(Time.days(1))
.process { new SessionProcessor }
.addSink { new HttpSink }
对于每个会话,我都会找到一个字段的最大值,并检查一些事件是否没有发生(如果确实发生了,它们将使最大值字段为零)。我决定创建一个 ProcessWindowFunction
来执行此操作。
Class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
//Parse and calculate data
maxValue = if(badEvent1) 0 else maxValue
maxValue = if(badEvent2) 0 else maxValue
out.collect((string1,string2,string3, maxValue))
}
}
这在允许延迟事件之前工作正常。当延迟事件通过时,maxValue
被重新计算并再次输出到 HttpSink
。我正在寻找一种方法,以便我可以计算之前 maxValue
和后期 maxValue
的增量。
我正在寻找一种方法来确定:
- 如果对函数的调用来自延迟事件(我不想重复计算会话总数)
- 新的数据是什么,或者有没有办法存储之前的计算值。
如有任何帮助,我们将不胜感激。
编辑:用于 ValueState 的新代码
KafkaConsumer.scala
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema
import org.apache.flink.streaming.api.scala._
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaConsumer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val properties = getServerProperties
val consumer = new FlinkKafkaConsumer010[ObjectNode]("test-topic", new JSONDeserializationSchema, properties)
consumer.setStartFromLatest()
val stream = env.addSource(consumer)
stream
.keyBy { jsonEvent => jsonEvent.findValue("data").findValue("query").findValue("session").toString }
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.days(1))
.process {
new SessionProcessor
}
.print
env.execute("Kafka APN Consumer")
}
}
SessionProcessor.scala
import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {
final val previousValue = new ValueStateDescriptor("previousValue", classOf[Long])
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
val previousVal: ValueState[Long] = context.windowState.getState(previousValue)
val pVal: Long = previousVal.value match {
case i: Long => i
}
var session = ""
var user = ""
var department = ""
var lVal: Long = 0
elements.foreach( value => {
var jVal: String = "0"
if (value.findValue("data").findValue("query").has("value")) {
jVal = value.findValue("data").findValue("query").findValue("value").toString replaceAll("\"", "")
}
session = value.findValue("data").findValue("query").findValue("session").toString replaceAll("\"", "")
user = value.findValue("data").findValue("query").findValue("user").toString replaceAll("\"", "")
department = value.findValue("data").findValue("query").findValue("department").toString replaceAll("\"", "")
lVal = if (jVal.toLong > lVal) jVal.toLong else lVal
})
val increaseTime = lVal - pVal
previousVal.update(increaseTime)
out.collect((session, user, department, increaseTime))
}
}
这是一个执行类似操作的示例。希望它是合理的不言自明的,并且应该足够简单以适应您的需求。
这里的基本思想是您可以使用 context.windowState()
,这是通过传递给 ProcessWindowFunction 的上下文提供的每个 window 状态。这个 windowState 实际上只对多次触发的 windows 有用,因为每个新的 window 实例都有一个新初始化的(和空的)windowState 存储。对于在所有 windows 之间共享的状态(但仍然是键控的),使用 context.globalState()
.
private static class DifferentialWindowFunction
extends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> {
private final static ValueStateDescriptor<Long> previousFiringState =
new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE);
private final static ReducingStateDescriptor<Long> firingCounterState =
new ReducingStateDescriptor<>("firing-counter", new Sum(), LongSerializer.INSTANCE);
@Override
public void process(
String key,
Context context,
Iterable<Long> values,
Collector<Tuple2<Long, Long>> out) {
ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);
Long output = Iterables.getOnlyElement(values);
if (firingCounter.get() == null) {
// first firing
out.collect(Tuple2.of(0L, output));
} else {
// subsequent firing
out.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value()));
}
firingCounter.add(1L);
previousFiring.update(output);
}
@Override
public void clear(Context context) {
ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);
previousFiring.clear();
firingCounter.clear();
}
}