KeyedProcessFunction 中所有键的 Flink 公共状态
Flink common state for all keys in the KeyedProcessFunction
我想为 KeyedProcessFunction
中的所有状态保留一个简单的值,如下所示;
class StateC() extends KeyedProcessFunction[Long, A, B] {
var timestamp: Timestamp = _
override def open(parameters: Configuration): Unit = {
timestamp = assignCurrentHour()
}
override def processElement(item: A, ... ): Unit = {
val currentHour = now.truncateToHour()
if (currentHour.after(timestamp)) {
timestamp = assignCurrentHour()
}
....
}
}
我只是想知道我是否在新的时间。为此,我保留 timestamp
variable.The timestamp
变量的值对于此 TaskManager 中的所有键都是通用的。所以我不需要为每个键声明。
在这种情况下,timestamp
变量将在新的小时内在此 task manager
处理任何事件时更新,对吗?
会不会是timestamp
变量的并发修改?
也许你可以使用广播状态,它是所有任务的通用状态
public 摘要 class KeyedBroadcastProcessFunction
扩展 BaseBroadcastProcessFunction {
private static final long serialVersionUID = -2584726797564976453L;
/**
* This method is called for each element in the (non-broadcast) {@link
* org.apache.flink.streaming.api.datastream.KeyedStream keyed stream}.
*
* <p>It can output zero or more elements using the {@link Collector} parameter, query the
* current processing/event time, and also query and update the local keyed state. In addition,
* it can get a {@link TimerService} for registering timers and querying the time. Finally, it
* has <b>read-only</b> access to the broadcast state. The context is only valid during the
* invocation of this method, do not store it.
*
* @param value The stream element.
* @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
* querying the current processing/event time and iterating the broadcast state with
* <b>read-only</b> access. The context is only valid during the invocation of this method,
* do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
public abstract void processElement(
final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
/**
* This method is called for each element in the {@link
* org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}.
*
* <p>It can output zero or more elements using the {@link Collector} parameter, query the
* current processing/event time, and also query and update the internal {@link
* org.apache.flink.api.common.state.BroadcastState broadcast state}. In addition, it can
* register a {@link KeyedStateFunction function} to be applied to all keyed states on the local
* partition. These can be done through the provided {@link Context}. The context is only valid
* during the invocation of this method, do not store it.
*
* @param value The stream element.
* @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
* current processing/event time and updating the broadcast state. In addition, it allows
* the registration of a {@link KeyedStateFunction function} to be applied to all keyed
* state with a given {@link StateDescriptor} on the local partition. The context is only
* valid during the invocation of this method, do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
public abstract void processBroadcastElement(
final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
您提出的实施看起来不错(但请记住,我并不知道您的所有要求)。 KeyedProcessFunction
的每个并行实例都有自己的时间戳版本,并且每个实例都会在新的一小时处理事件后立即更新其时间戳。 KeyedProcessFunction
是单线程的:您不必担心并发更新。 (onTimer
方法与其他方法同步;那里没有什么可担心的。)
我想为 KeyedProcessFunction
中的所有状态保留一个简单的值,如下所示;
class StateC() extends KeyedProcessFunction[Long, A, B] {
var timestamp: Timestamp = _
override def open(parameters: Configuration): Unit = {
timestamp = assignCurrentHour()
}
override def processElement(item: A, ... ): Unit = {
val currentHour = now.truncateToHour()
if (currentHour.after(timestamp)) {
timestamp = assignCurrentHour()
}
....
}
}
我只是想知道我是否在新的时间。为此,我保留 timestamp
variable.The timestamp
变量的值对于此 TaskManager 中的所有键都是通用的。所以我不需要为每个键声明。
在这种情况下,timestamp
变量将在新的小时内在此 task manager
处理任何事件时更新,对吗?
会不会是timestamp
变量的并发修改?
也许你可以使用广播状态,它是所有任务的通用状态
public 摘要 class KeyedBroadcastProcessFunction
private static final long serialVersionUID = -2584726797564976453L;
/**
* This method is called for each element in the (non-broadcast) {@link
* org.apache.flink.streaming.api.datastream.KeyedStream keyed stream}.
*
* <p>It can output zero or more elements using the {@link Collector} parameter, query the
* current processing/event time, and also query and update the local keyed state. In addition,
* it can get a {@link TimerService} for registering timers and querying the time. Finally, it
* has <b>read-only</b> access to the broadcast state. The context is only valid during the
* invocation of this method, do not store it.
*
* @param value The stream element.
* @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
* querying the current processing/event time and iterating the broadcast state with
* <b>read-only</b> access. The context is only valid during the invocation of this method,
* do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
public abstract void processElement(
final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
/**
* This method is called for each element in the {@link
* org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}.
*
* <p>It can output zero or more elements using the {@link Collector} parameter, query the
* current processing/event time, and also query and update the internal {@link
* org.apache.flink.api.common.state.BroadcastState broadcast state}. In addition, it can
* register a {@link KeyedStateFunction function} to be applied to all keyed states on the local
* partition. These can be done through the provided {@link Context}. The context is only valid
* during the invocation of this method, do not store it.
*
* @param value The stream element.
* @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
* current processing/event time and updating the broadcast state. In addition, it allows
* the registration of a {@link KeyedStateFunction function} to be applied to all keyed
* state with a given {@link StateDescriptor} on the local partition. The context is only
* valid during the invocation of this method, do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
public abstract void processBroadcastElement(
final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
您提出的实施看起来不错(但请记住,我并不知道您的所有要求)。 KeyedProcessFunction
的每个并行实例都有自己的时间戳版本,并且每个实例都会在新的一小时处理事件后立即更新其时间戳。 KeyedProcessFunction
是单线程的:您不必担心并发更新。 (onTimer
方法与其他方法同步;那里没有什么可担心的。)