Ververica Flink 培训资料之 ExpiringStateSolution 之谜
The puzzles of ExpiringStateSolution from Ververica Flink Training materials
不幸的是,原来训练的ververica被更改并重定向到另一个页面,导致我无法再次查看这个例子的介绍,我确实找到了一些其他的例子,但是对于这个特定的,我没有找到它,这是我最近一直在挣扎的事情,
核心部分代码片段如下:
第一种处理ride stream的方法
@Override
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
TimerService service = context.timerService();
System.out.println("ride time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
System.out.println("ride state ===> " + fare);
if (fare != null) {
System.out.println("fare is not null ===>" + fare.rideId);
fareState.clear();
context.timerService().deleteEventTimeTimer(fare.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
System.out.println("update ride state ===> " + ride.rideId + "===>" + context.timestamp());
rideState.update(ride);
System.out.println(rideState.value());
// as soon as the watermark arrives, we can stop waiting for the corresponding fare
context.timerService().registerEventTimeTimer(ride.getEventTime());
}
}
票价流处理方法二
@Override
public void processElement2(TaxiFare fare, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TimerService service = context.timerService();
System.out.println("fare time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
TaxiRide ride = rideState.value();
System.out.println("fare state ===> " + ride);
if (ride != null) {
System.out.println("ride is not null ===> " + ride.rideId);
rideState.clear();
context.timerService().deleteEventTimeTimer(ride.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
System.out.println("update fare state ===> " + fare.rideId + "===>" + context.timestamp());
fareState.update(fare);
System.out.println(fareState.value() + "===>" + fareState.value().getEventTime());
// as soon as the watermark arrives, we can stop waiting for the corresponding ride
context.timerService().registerEventTimeTimer(fare.getEventTime());
}
}
processElement1 显然用于 TaxiRide 流,2 用于 TaxiFare,
首先是在执行processElement1之前会运行 processElement2一段时间,直到现在才找到原因,这里是打印部分
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
fare state ===> null
update fare state ===> 26===>1356998400000
update fare state ===> 58===>1356998400000
58,2013000058,2013000058,2013-01-01 00:00:00,CRD,2.0,0.0,27.0===>1356998400000
26,2013000026,2013000026,2013-01-01 00:00:00,CRD,2.0,0.0,12.5===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 47===>1356998400000
9,2013000009,2013000009,2013-01-01 00:00:00,CRD,1.0,0.0,6.0===>1356998400000
47,2013000047,2013000047,2013-01-01 00:00:00,CRD,0.9,0.0,5.9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 54===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
54,2013000054,2013000054,2013-01-01 00:00:00,CSH,0.0,0.0,31.0===>1356998400000
第二个是,因为ValueState是关于一个值的,而不是一个包含很多值的列表,每次调用processElemnt2,如果ride为null,它会去else,调用后fareState.update(),它会改变 ValueState 的值,在我看来,这意味着它认为 ValueState 的先前值是匹配的,对吧? -----最大的难题
感谢您的回答,非常感谢您的帮助!
state and connected streams 上的新教程应该可以帮助您解决问题。但简而言之:
您无法控制 processElement1
和 processElement2
回调的调用顺序。这两个输入流相互竞争,Flink 运行time 将按照它想要的方式处理来自一个流或另一个流的事件。在时间 and/or 排序很重要的情况下,您可能会发现有必要在托管 Flink 状态中缓冲事件,直到您的应用程序准备好处理它们。
ValueState 是一种 keyed state,这意味着每当访问或更新状态时,状态后端中的条目 因为 context 中的键被读取或写入。 “上下文中的键”是正在处理的流元素的键(在 processElement
回调的情况下),或者是创建计时器的键(在 onTimer
回调的情况下) .
此外,请记住,在本练习中,每个键最多有一个 TaxiRide 和一个 TaxiFare。
本练习的参考解决方案说明了一种思考如何管理可能泄漏的状态的方法,但在这种情况下,显然没有一个正确的答案。本练习的目的是激发对如何使用状态和计时器的一些思考,并使所涉及的一些问题浮出水面。
好的解决方案的目标是什么?应该
- 产生正确的结果
- 不漏状态
- 通俗易懂
- 表现不错
现在让我们考虑这些目标来研究建议的解决方案。我们在 processElement1
中找到这段代码(顺便说一句,processElement2
是相同的,只是乘车和票价之间的角色互换):
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
context.timerService().deleteEventTimeTimer(fare.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
rideState.update(ride);
// as soon as the watermark arrives, we can stop waiting for the corresponding fare
context.timerService().registerEventTimeTimer(ride.getEventTime());
}
}
这意味着
- 每当没有完成一对的事件到达时,我们将其存储在状态中并创建一个计时器
- 每当完成一对的事件到达时,我们清除状态并删除匹配事件的计时器(较早存储)
所以很明显,如果两个事件都到达,则不会泄漏任何内容。但是万一少了一个怎么办?
在那种情况下,计时器会在某个时刻触发,运行这段代码将清楚地清除可能存在的任何状态:
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
if (fareState.value() != null) {
ctx.output(unmatchedFares, fareState.value());
fareState.clear();
}
if (rideState.value() != null) {
ctx.output(unmatchedRides, rideState.value());
rideState.clear();
}
}
好的,但是我们是如何决定等待多长时间的?等到 ride.getEventTime()
就够了吗?
为 ride.getEventTime()
设置事件时间计时器的效果是等待行程和票价流中的任何乱序问题得到解决。当水印达到 ride.getEventTime()
时,所有较早的乘车和票价事件都将到达,假设水印是完美的。
在这些练习中,加水印实际上是完美的——不会有延迟事件。但是在现实环境中,您应该预料到一些延迟事件,并且我们应该预料到我们的实现在这种情况下会正确运行。该参考解决方案的作用是:
- 匹配对中的一个事件将先到达,并创建一个计时器来安排其最终删除
- 计时器将触发,事件将被清除
- 匹配事件迟到,并创建另一个计时器,在本例中时间已经过去
- 下一个到达的水印触发该定时器,状态被清除
换句话说,当事件迟到时,不会泄漏任何状态,但不会产生由此产生的连接。因此,如果您希望在数据迟到的情况下仍然产生结果,您应该创建计时器以通过将必要的状态保留一段时间来适应一些延迟,例如,
context.timerService().registerEventTimeTimer(ride.getEventTime() + ALLOWED_LATENESS);
尝试适应任意延迟的事件不是一个好主意,因为这样做需要无限期地为每个延迟事件保留一些状态。
使用处理时间计时器怎么样?
当然可以,但测试起来可能更麻烦。
为什么不使用状态生存时间呢?
这是个好主意。一般来说,您可能想考虑使用 State TTL 来实现 GDPR 合规性(例如),并使用计时器来实现业务逻辑。
不幸的是,原来训练的ververica被更改并重定向到另一个页面,导致我无法再次查看这个例子的介绍,我确实找到了一些其他的例子,但是对于这个特定的,我没有找到它,这是我最近一直在挣扎的事情,
核心部分代码片段如下: 第一种处理ride stream的方法
@Override
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
TimerService service = context.timerService();
System.out.println("ride time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
System.out.println("ride state ===> " + fare);
if (fare != null) {
System.out.println("fare is not null ===>" + fare.rideId);
fareState.clear();
context.timerService().deleteEventTimeTimer(fare.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
System.out.println("update ride state ===> " + ride.rideId + "===>" + context.timestamp());
rideState.update(ride);
System.out.println(rideState.value());
// as soon as the watermark arrives, we can stop waiting for the corresponding fare
context.timerService().registerEventTimeTimer(ride.getEventTime());
}
}
票价流处理方法二
@Override
public void processElement2(TaxiFare fare, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TimerService service = context.timerService();
System.out.println("fare time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
TaxiRide ride = rideState.value();
System.out.println("fare state ===> " + ride);
if (ride != null) {
System.out.println("ride is not null ===> " + ride.rideId);
rideState.clear();
context.timerService().deleteEventTimeTimer(ride.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
System.out.println("update fare state ===> " + fare.rideId + "===>" + context.timestamp());
fareState.update(fare);
System.out.println(fareState.value() + "===>" + fareState.value().getEventTime());
// as soon as the watermark arrives, we can stop waiting for the corresponding ride
context.timerService().registerEventTimeTimer(fare.getEventTime());
}
}
processElement1 显然用于 TaxiRide 流,2 用于 TaxiFare, 首先是在执行processElement1之前会运行 processElement2一段时间,直到现在才找到原因,这里是打印部分
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
fare state ===> null
update fare state ===> 26===>1356998400000
update fare state ===> 58===>1356998400000
58,2013000058,2013000058,2013-01-01 00:00:00,CRD,2.0,0.0,27.0===>1356998400000
26,2013000026,2013000026,2013-01-01 00:00:00,CRD,2.0,0.0,12.5===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 47===>1356998400000
9,2013000009,2013000009,2013-01-01 00:00:00,CRD,1.0,0.0,6.0===>1356998400000
47,2013000047,2013000047,2013-01-01 00:00:00,CRD,0.9,0.0,5.9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 54===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
54,2013000054,2013000054,2013-01-01 00:00:00,CSH,0.0,0.0,31.0===>1356998400000
第二个是,因为ValueState是关于一个值的,而不是一个包含很多值的列表,每次调用processElemnt2,如果ride为null,它会去else,调用后fareState.update(),它会改变 ValueState 的值,在我看来,这意味着它认为 ValueState 的先前值是匹配的,对吧? -----最大的难题 感谢您的回答,非常感谢您的帮助!
state and connected streams 上的新教程应该可以帮助您解决问题。但简而言之:
您无法控制
processElement1
和processElement2
回调的调用顺序。这两个输入流相互竞争,Flink 运行time 将按照它想要的方式处理来自一个流或另一个流的事件。在时间 and/or 排序很重要的情况下,您可能会发现有必要在托管 Flink 状态中缓冲事件,直到您的应用程序准备好处理它们。ValueState 是一种 keyed state,这意味着每当访问或更新状态时,状态后端中的条目 因为 context 中的键被读取或写入。 “上下文中的键”是正在处理的流元素的键(在
processElement
回调的情况下),或者是创建计时器的键(在onTimer
回调的情况下) .
此外,请记住,在本练习中,每个键最多有一个 TaxiRide 和一个 TaxiFare。
本练习的参考解决方案说明了一种思考如何管理可能泄漏的状态的方法,但在这种情况下,显然没有一个正确的答案。本练习的目的是激发对如何使用状态和计时器的一些思考,并使所涉及的一些问题浮出水面。
好的解决方案的目标是什么?应该
- 产生正确的结果
- 不漏状态
- 通俗易懂
- 表现不错
现在让我们考虑这些目标来研究建议的解决方案。我们在 processElement1
中找到这段代码(顺便说一句,processElement2
是相同的,只是乘车和票价之间的角色互换):
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
context.timerService().deleteEventTimeTimer(fare.getEventTime());
out.collect(new Tuple2(ride, fare));
} else {
rideState.update(ride);
// as soon as the watermark arrives, we can stop waiting for the corresponding fare
context.timerService().registerEventTimeTimer(ride.getEventTime());
}
}
这意味着
- 每当没有完成一对的事件到达时,我们将其存储在状态中并创建一个计时器
- 每当完成一对的事件到达时,我们清除状态并删除匹配事件的计时器(较早存储)
所以很明显,如果两个事件都到达,则不会泄漏任何内容。但是万一少了一个怎么办?
在那种情况下,计时器会在某个时刻触发,运行这段代码将清楚地清除可能存在的任何状态:
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
if (fareState.value() != null) {
ctx.output(unmatchedFares, fareState.value());
fareState.clear();
}
if (rideState.value() != null) {
ctx.output(unmatchedRides, rideState.value());
rideState.clear();
}
}
好的,但是我们是如何决定等待多长时间的?等到 ride.getEventTime()
就够了吗?
为 ride.getEventTime()
设置事件时间计时器的效果是等待行程和票价流中的任何乱序问题得到解决。当水印达到 ride.getEventTime()
时,所有较早的乘车和票价事件都将到达,假设水印是完美的。
在这些练习中,加水印实际上是完美的——不会有延迟事件。但是在现实环境中,您应该预料到一些延迟事件,并且我们应该预料到我们的实现在这种情况下会正确运行。该参考解决方案的作用是:
- 匹配对中的一个事件将先到达,并创建一个计时器来安排其最终删除
- 计时器将触发,事件将被清除
- 匹配事件迟到,并创建另一个计时器,在本例中时间已经过去
- 下一个到达的水印触发该定时器,状态被清除
换句话说,当事件迟到时,不会泄漏任何状态,但不会产生由此产生的连接。因此,如果您希望在数据迟到的情况下仍然产生结果,您应该创建计时器以通过将必要的状态保留一段时间来适应一些延迟,例如,
context.timerService().registerEventTimeTimer(ride.getEventTime() + ALLOWED_LATENESS);
尝试适应任意延迟的事件不是一个好主意,因为这样做需要无限期地为每个延迟事件保留一些状态。
使用处理时间计时器怎么样?
当然可以,但测试起来可能更麻烦。
为什么不使用状态生存时间呢?
这是个好主意。一般来说,您可能想考虑使用 State TTL 来实现 GDPR 合规性(例如),并使用计时器来实现业务逻辑。