来自 KeyedCoProcessFunction 的 processElement1 和 processElement2 是原子的 w.r.t。状态?或者他们可以同时修改共享状态吗?
Are processElement1 and processElement2 from KeyedCoProcessFunction atomic w.r.t. state? or could they modify a shared state concurrently?
我有 2 个流,一个数据流仅包含要设置 ValueState
passing=true/false
的标志,另一个控制流将要通知的用户添加到 MapState
。当 passing
从 false
更改为 true
时,将向 MapState
中尚未收到通知的用户发出通知。
这是状态转换的方式
这里是处理这个逻辑的KeyedCoProcessFunction
。
class TestKeyedCoProcessFunction extends KeyedCoProcessFunction[String, String, String, String] {
@transient private var notified: MapState[String, Boolean] = _ // user:was_notified
@transient private var passing: ValueState[java.lang.Boolean] = _ // issue notification when updated to passing=true
override def open(parameters: Configuration): Unit = {
val notifiedDescriptor = new MapStateDescriptor("notified", Types.STRING, Types.BOOLEAN)
notified = getRuntimeContext.getMapState(notifiedDescriptor)
val passingDescriptor = new ValueStateDescriptor("passing", Types.BOOLEAN)
passing = getRuntimeContext.getState(passingDescriptor)
if (passing.value() == null) {
passing.update(false)
}
}
def addUser(user: String): Boolean = {
if (notified.contains(user)) {
false
} else {
notified.put(user, false)
true
}
}
def setPassing(newPassing: String): Boolean = {
if (passing.value()) {
if (newPassing == "true") {
false
} else {
passing.update(false)
true
}
} else {
if (newPassing == "false") {
false
} else {
passing.update(true)
true
}
}
}
def notifyNotNotifiedUsers(collector: Collector[String]): Unit = {
val keys = notified.keys().iterator()
while (keys.hasNext) {
val user = keys.next()
val userNotified = notified.get(user)
if (!userNotified) {
collector.collect("Hey " + user + " passing=true")
notified.put(user, true)
}
}
}
def setNotifiedFalseAll(): Unit = {
val keys = notified.keys().iterator()
while (keys.hasNext) {
val user = keys.next()
val userNotified = notified.get(user)
if (userNotified) {
notified.put(user, false)
}
}
}
override def processElement1(user: String,
context: KeyedCoProcessFunction[String, String, String, String]#Context,
collector: Collector[String]): Unit = {
addUser(user)
if (passing.value()) {
notifyNotNotifiedUsers(collector)
}
}
override def processElement2(newPassing: String,
context: KeyedCoProcessFunction[String, String, String, String]#Context,
collector: Collector[String]): Unit = {
val modified = setPassing(newPassing)
if (passing.value()) {
notifyNotNotifiedUsers(collector)
} else {
if (modified) {
setNotifiedFalseAll()
}
}
}
}
是否有可能在 Flink
中出现竞争条件,其中 processElement1
和 processElement2
同时执行,例如
t+1 processElement2("true")
t+2 processElement2: setPassing("true")
t+3 processElement2: notifyNotNotifiedUsers() // starts iteration on MapState
t+4 processElement1("new_user")
t+5 processElement1: addUser(user) // adds user to MapState
t+6 processElement1: notifyNotNotifiedUsers() // starts another parallel iteration on MapState resulting in maybe missed/duplicate notification
就此而言,在 KeyedCoProcessFunction
的任何给定实例或 Flink 的任何用户函数接口中都不可能存在竞争条件。 processElement1
和processElement2
不能同时执行。 onTimer
也很安全。
我有 2 个流,一个数据流仅包含要设置 ValueState
passing=true/false
的标志,另一个控制流将要通知的用户添加到 MapState
。当 passing
从 false
更改为 true
时,将向 MapState
中尚未收到通知的用户发出通知。
这是状态转换的方式
这里是处理这个逻辑的KeyedCoProcessFunction
。
class TestKeyedCoProcessFunction extends KeyedCoProcessFunction[String, String, String, String] {
@transient private var notified: MapState[String, Boolean] = _ // user:was_notified
@transient private var passing: ValueState[java.lang.Boolean] = _ // issue notification when updated to passing=true
override def open(parameters: Configuration): Unit = {
val notifiedDescriptor = new MapStateDescriptor("notified", Types.STRING, Types.BOOLEAN)
notified = getRuntimeContext.getMapState(notifiedDescriptor)
val passingDescriptor = new ValueStateDescriptor("passing", Types.BOOLEAN)
passing = getRuntimeContext.getState(passingDescriptor)
if (passing.value() == null) {
passing.update(false)
}
}
def addUser(user: String): Boolean = {
if (notified.contains(user)) {
false
} else {
notified.put(user, false)
true
}
}
def setPassing(newPassing: String): Boolean = {
if (passing.value()) {
if (newPassing == "true") {
false
} else {
passing.update(false)
true
}
} else {
if (newPassing == "false") {
false
} else {
passing.update(true)
true
}
}
}
def notifyNotNotifiedUsers(collector: Collector[String]): Unit = {
val keys = notified.keys().iterator()
while (keys.hasNext) {
val user = keys.next()
val userNotified = notified.get(user)
if (!userNotified) {
collector.collect("Hey " + user + " passing=true")
notified.put(user, true)
}
}
}
def setNotifiedFalseAll(): Unit = {
val keys = notified.keys().iterator()
while (keys.hasNext) {
val user = keys.next()
val userNotified = notified.get(user)
if (userNotified) {
notified.put(user, false)
}
}
}
override def processElement1(user: String,
context: KeyedCoProcessFunction[String, String, String, String]#Context,
collector: Collector[String]): Unit = {
addUser(user)
if (passing.value()) {
notifyNotNotifiedUsers(collector)
}
}
override def processElement2(newPassing: String,
context: KeyedCoProcessFunction[String, String, String, String]#Context,
collector: Collector[String]): Unit = {
val modified = setPassing(newPassing)
if (passing.value()) {
notifyNotNotifiedUsers(collector)
} else {
if (modified) {
setNotifiedFalseAll()
}
}
}
}
是否有可能在 Flink
中出现竞争条件,其中 processElement1
和 processElement2
同时执行,例如
t+1 processElement2("true")
t+2 processElement2: setPassing("true")
t+3 processElement2: notifyNotNotifiedUsers() // starts iteration on MapState
t+4 processElement1("new_user")
t+5 processElement1: addUser(user) // adds user to MapState
t+6 processElement1: notifyNotNotifiedUsers() // starts another parallel iteration on MapState resulting in maybe missed/duplicate notification
就此而言,在 KeyedCoProcessFunction
的任何给定实例或 Flink 的任何用户函数接口中都不可能存在竞争条件。 processElement1
和processElement2
不能同时执行。 onTimer
也很安全。