KeyedProcessFunction 中所有键的 Flink 公共状态

Flink common state for all keys in the KeyedProcessFunction

我想为 KeyedProcessFunction 中的所有状态保留一个简单的值,如下所示;

class StateC() extends KeyedProcessFunction[Long, A, B] {
  var timestamp: Timestamp = _

  override def open(parameters: Configuration): Unit = {
    timestamp = assignCurrentHour()
  }

 override def processElement(item: A, ... ): Unit = {
    val currentHour = now.truncateToHour()
    if (currentHour.after(timestamp)) { 
      timestamp = assignCurrentHour()
    }
   .... 
 }
}

我只是想知道我是否在新的时间。为此,我保留 timestamp variable.The timestamp 变量的值对于此 TaskManager 中的所有键都是通用的。所以我不需要为每个键声明。

在这种情况下,timestamp 变量将在新的小时内在此 task manager 处理任何事件时更新,对吗?

会不会是timestamp变量的并发修改?

也许你可以使用广播状态,它是所有任务的通用状态

详细用法可以参考 https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

public 摘要 class KeyedBroadcastProcessFunction 扩展 BaseBroadcastProcessFunction {

private static final long serialVersionUID = -2584726797564976453L;

/**
 * This method is called for each element in the (non-broadcast) {@link
 * org.apache.flink.streaming.api.datastream.KeyedStream keyed stream}.
 *
 * <p>It can output zero or more elements using the {@link Collector} parameter, query the
 * current processing/event time, and also query and update the local keyed state. In addition,
 * it can get a {@link TimerService} for registering timers and querying the time. Finally, it
 * has <b>read-only</b> access to the broadcast state. The context is only valid during the
 * invocation of this method, do not store it.
 *
 * @param value The stream element.
 * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
 *     querying the current processing/event time and iterating the broadcast state with
 *     <b>read-only</b> access. The context is only valid during the invocation of this method,
 *     do not store it.
 * @param out The collector to emit resulting elements to
 * @throws Exception The function may throw exceptions which cause the streaming program to fail
 *     and go into recovery.
 */
public abstract void processElement(
        final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;

/**
 * This method is called for each element in the {@link
 * org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}.
 *
 * <p>It can output zero or more elements using the {@link Collector} parameter, query the
 * current processing/event time, and also query and update the internal {@link
 * org.apache.flink.api.common.state.BroadcastState broadcast state}. In addition, it can
 * register a {@link KeyedStateFunction function} to be applied to all keyed states on the local
 * partition. These can be done through the provided {@link Context}. The context is only valid
 * during the invocation of this method, do not store it.
 *
 * @param value The stream element.
 * @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
 *     current processing/event time and updating the broadcast state. In addition, it allows
 *     the registration of a {@link KeyedStateFunction function} to be applied to all keyed
 *     state with a given {@link StateDescriptor} on the local partition. The context is only
 *     valid during the invocation of this method, do not store it.
 * @param out The collector to emit resulting elements to
 * @throws Exception The function may throw exceptions which cause the streaming program to fail
 *     and go into recovery.
 */
public abstract void processBroadcastElement(
        final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;

您提出的实施看起来不错(但请记住,我并不知道您的所有要求)。 KeyedProcessFunction 的每个并行实例都有自己的时间戳版本,并且每个实例都会在新的一小时处理事件后立即更新其时间戳。 KeyedProcessFunction 是单线程的:您不必担心并发更新。 (onTimer 方法与其他方法同步;那里没有什么可担心的。)