来自 KeyedCoProcessFunction 的 processElement1 和 processElement2 是原子的 w.r.t。状态?或者他们可以同时修改共享状态吗?

Are processElement1 and processElement2 from KeyedCoProcessFunction atomic w.r.t. state? or could they modify a shared state concurrently?

我有 2 个流,一个数据流仅包含要设置 ValueState passing=true/false 的标志,另一个控制流将要通知的用户添加到 MapState。当 passingfalse 更改为 true 时,将向 MapState 中尚未收到通知的用户发出通知。

这是状态转换的方式

这里是处理这个逻辑的KeyedCoProcessFunction

class TestKeyedCoProcessFunction extends KeyedCoProcessFunction[String, String, String, String] {

    @transient private var notified: MapState[String, Boolean] = _ // user:was_notified
    @transient private var passing: ValueState[java.lang.Boolean] = _ // issue notification when updated to passing=true 

    override def open(parameters: Configuration): Unit = {
      val notifiedDescriptor = new MapStateDescriptor("notified", Types.STRING, Types.BOOLEAN)
      notified = getRuntimeContext.getMapState(notifiedDescriptor)

      val passingDescriptor = new ValueStateDescriptor("passing", Types.BOOLEAN)
      passing = getRuntimeContext.getState(passingDescriptor)

      if (passing.value() == null) {
        passing.update(false)
      }
    }

    def addUser(user: String): Boolean = {
      if (notified.contains(user)) {
        false
      } else {
        notified.put(user, false)
        true
      }
    }

    def setPassing(newPassing: String): Boolean = {
      if (passing.value()) {
        if (newPassing == "true") {
          false
        } else {
          passing.update(false)
          true
        }
      } else {
        if (newPassing == "false") {
          false
        } else {
          passing.update(true)
          true
        }
      }
    }

    def notifyNotNotifiedUsers(collector: Collector[String]): Unit = {
      val keys = notified.keys().iterator()
      while (keys.hasNext) {
        val user = keys.next()
        val userNotified = notified.get(user)
        if (!userNotified) {
          collector.collect("Hey " + user + " passing=true")
          notified.put(user, true)
        }
      }
    }

    def setNotifiedFalseAll(): Unit = {
      val keys = notified.keys().iterator()
      while (keys.hasNext) {
        val user = keys.next()
        val userNotified = notified.get(user)
        if (userNotified) {
          notified.put(user, false)
        }
      }
    }

    override def processElement1(user: String,
                                 context: KeyedCoProcessFunction[String, String, String, String]#Context,
                                 collector: Collector[String]): Unit = {
      addUser(user)
      if (passing.value()) {
        notifyNotNotifiedUsers(collector)
      }
    }

    override def processElement2(newPassing: String,
                                 context: KeyedCoProcessFunction[String, String, String, String]#Context,
                                 collector: Collector[String]): Unit = {
      val modified = setPassing(newPassing)
      if (passing.value()) {
        notifyNotNotifiedUsers(collector)
      } else {
        if (modified) {
          setNotifiedFalseAll()
        }
      }
    }
  }

是否有可能在 Flink 中出现竞争条件,其中 processElement1processElement2 同时执行,例如

t+1 processElement2("true")
t+2 processElement2: setPassing("true")
t+3 processElement2: notifyNotNotifiedUsers() // starts iteration on MapState
t+4 processElement1("new_user")
t+5 processElement1: addUser(user) // adds user to MapState
t+6 processElement1: notifyNotNotifiedUsers() // starts another parallel iteration on MapState resulting in maybe missed/duplicate notification

就此而言,在 KeyedCoProcessFunction 的任何给定实例或 Flink 的任何用户函数接口中都不可能存在竞争条件。 processElement1processElement2不能同时执行。 onTimer 也很安全。