KafkaStreams 有状态应用程序随机失败
Random fail on KafkaStreams stateful application
嗨,这是我几天偶然发现的一个问题,我自己找不到答案。
我正在使用 Scala 流 API v2.0.0.
我有两个传入流,分支到两个处理程序以进行隔离,并且都使用公共 StateStore 声明一个 Transformer。
做一个快速概览,它看起来像
def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)
val handlers = List(handler1, handler2)
builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)
builder
}
我的每个处理程序看起来都一样:获取一个事件,执行一些操作,通过 transform()
方法来派生状态并发出聚合:
class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}
class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}
转换器使用相同的 StateStore,逻辑如下:对于新事件,检查其聚合是否存在,如果存在,更新它+存储它+发出新聚合,否则构建聚合+存储它+发出.
class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
Transformer2 一样,只是业务逻辑发生了变化(如何将新事件与聚合状态合并)
我遇到的问题是在流启动时,我可以正常启动或启动异常:
15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)
我已经用 "The transformer uses a Factory Pattern" 搜索并得到了结果,这就是这里使用的(因为 .transform 使用转换器并在底层创建了一个 TransformerSupplier)。
由于错误是伪随机的(有时我可以重新创建它),我猜这可能是启动时的竞争条件,但我没有发现任何结论。
是因为我在不同的变压器上使用相同的状态存储吗?
我假设你正在点击 https://issues.apache.org/jira/browse/KAFKA-7250
已在 2.0.1 和 2.1.0 版本中修复。
如果无法升级,需要显式传入TransformerSupplier
,因为Scale API在2.0.0中构建供应商不正确。
.transform(() => new Transformer1(config.storeName))
嗨,这是我几天偶然发现的一个问题,我自己找不到答案。
我正在使用 Scala 流 API v2.0.0.
我有两个传入流,分支到两个处理程序以进行隔离,并且都使用公共 StateStore 声明一个 Transformer。
做一个快速概览,它看起来像
def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)
val handlers = List(handler1, handler2)
builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)
builder
}
我的每个处理程序看起来都一样:获取一个事件,执行一些操作,通过 transform()
方法来派生状态并发出聚合:
class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}
class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}
转换器使用相同的 StateStore,逻辑如下:对于新事件,检查其聚合是否存在,如果存在,更新它+存储它+发出新聚合,否则构建聚合+存储它+发出.
class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
Transformer2 一样,只是业务逻辑发生了变化(如何将新事件与聚合状态合并)
我遇到的问题是在流启动时,我可以正常启动或启动异常:
15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)
我已经用 "The transformer uses a Factory Pattern" 搜索并得到了结果,这就是这里使用的(因为 .transform 使用转换器并在底层创建了一个 TransformerSupplier)。 由于错误是伪随机的(有时我可以重新创建它),我猜这可能是启动时的竞争条件,但我没有发现任何结论。 是因为我在不同的变压器上使用相同的状态存储吗?
我假设你正在点击 https://issues.apache.org/jira/browse/KAFKA-7250
已在 2.0.1 和 2.1.0 版本中修复。
如果无法升级,需要显式传入TransformerSupplier
,因为Scale API在2.0.0中构建供应商不正确。
.transform(() => new Transformer1(config.storeName))