Kafka Streams PAPI:处理器关闭在启动时被调用并且过于频繁
Kafka Streams PAPI: Processor close gets called on start and too often
如 Confluent docs on Writing PAPI Applications 所示,您应该关闭处理器中使用的存储,覆盖 close 方法。
在 WordCountProcessor 示例中,它显示了应如何在调用 close() 方法时关闭商店。
我做过类似的事情(我不是在 init() 方法中启动它们,而是在 Scala 中使用 lazy val),我发现我的 Processor close() 方法在创建商店后立即被调用, 以及多次。
class EventWindowProcessor(sessionStoreName: String, lastSessionByChannelStoreName: String, lastChannelStoreName: String)
extends AbstractProcesso
// example of a store
private lazy val lastChannelStore: KeyValueStore[MyKey, Channel] =
context()
.getStateStore(lastChannelStoreName)
.asInstanceOf[KeyValueStore[MyKey, Channel]]
override def init(context: ProcessorContext) = {
super.init(context)
}
override def close() = {
logger.info("CLOSING PROCESSOR")
}
override def process(key: String, value: String): Unit = {
// ... my stuff here
}
所以我得到以下输出,显示 processor.close() 在拓扑 运行 的开头被调用了很多次 - 并且在应用程序的后期点也被调用。
[2018-06-08 05:13:16,255] INFO Stream Application starting, name: stream-processor (my.package.StreamProcessorApplication$)
[2018-06-08 05:13:16,760] INFO Topology: Sub-topologies:
Sub-topology: 0
Source: event-source (topics: [events])
--> session-processor
Processor: session-processor (stores: [sessionStoreName, lastSessionByChannelStoreName, lastChannelStoreName])
--> error-event-sink, order-sink, pageviews-sink, session-sink
<-- event-source
Sink: error-event-sink (topic: error-events)
<-- session-processor
Sink: order-sink (topic: orders)
<-- session-processor
Sink: pageviews-sink (topic: pageviews)
<-- session-processor
Sink: session-sink (topic: sessions)
<-- session-processor
Global Stores:
none
(my.package.StreamProcessorApplication$)
[2018-06-08 05:14:01,425] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,539] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,640] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
... (102 lines like that)
[2018-06-08 05:29:05,548] INFO .... my own application logging here
...因此,如果我在该 close() 方法中关闭商店,当我在 process() 中的代码尝试使用它们时,会出现异常,说明商店已关闭。
为什么 processor.close() 在 KafkaStreams 开始时被调用?为什么它经常发生?
不明确关闭门店有哪些风险?
文档中的示例不正确。您不应该关闭商店——商店由 Kafka Streams 管理,Kafka Streams 将为您关闭商店。 (我会做一个 PR 来修复代码示例。感谢您指出。)
关于对 Processor#close()
的调用:预计处理器可能会关闭并重新打开。这发生在重新平衡期间。因此,您的代码必须以一种方式编写,以便它可以在多次调用 init()
和 close()
时正常工作——我们最近为此更新了 JavaDocs(改进的 JavaDocs 将成为 Kafka 2.0 版本的一部分).
如 Confluent docs on Writing PAPI Applications 所示,您应该关闭处理器中使用的存储,覆盖 close 方法。
在 WordCountProcessor 示例中,它显示了应如何在调用 close() 方法时关闭商店。
我做过类似的事情(我不是在 init() 方法中启动它们,而是在 Scala 中使用 lazy val),我发现我的 Processor close() 方法在创建商店后立即被调用, 以及多次。
class EventWindowProcessor(sessionStoreName: String, lastSessionByChannelStoreName: String, lastChannelStoreName: String)
extends AbstractProcesso
// example of a store
private lazy val lastChannelStore: KeyValueStore[MyKey, Channel] =
context()
.getStateStore(lastChannelStoreName)
.asInstanceOf[KeyValueStore[MyKey, Channel]]
override def init(context: ProcessorContext) = {
super.init(context)
}
override def close() = {
logger.info("CLOSING PROCESSOR")
}
override def process(key: String, value: String): Unit = {
// ... my stuff here
}
所以我得到以下输出,显示 processor.close() 在拓扑 运行 的开头被调用了很多次 - 并且在应用程序的后期点也被调用。
[2018-06-08 05:13:16,255] INFO Stream Application starting, name: stream-processor (my.package.StreamProcessorApplication$)
[2018-06-08 05:13:16,760] INFO Topology: Sub-topologies:
Sub-topology: 0
Source: event-source (topics: [events])
--> session-processor
Processor: session-processor (stores: [sessionStoreName, lastSessionByChannelStoreName, lastChannelStoreName])
--> error-event-sink, order-sink, pageviews-sink, session-sink
<-- event-source
Sink: error-event-sink (topic: error-events)
<-- session-processor
Sink: order-sink (topic: orders)
<-- session-processor
Sink: pageviews-sink (topic: pageviews)
<-- session-processor
Sink: session-sink (topic: sessions)
<-- session-processor
Global Stores:
none
(my.package.StreamProcessorApplication$)
[2018-06-08 05:14:01,425] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,539] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,640] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
... (102 lines like that)
[2018-06-08 05:29:05,548] INFO .... my own application logging here
...因此,如果我在该 close() 方法中关闭商店,当我在 process() 中的代码尝试使用它们时,会出现异常,说明商店已关闭。
为什么 processor.close() 在 KafkaStreams 开始时被调用?为什么它经常发生?
不明确关闭门店有哪些风险?
文档中的示例不正确。您不应该关闭商店——商店由 Kafka Streams 管理,Kafka Streams 将为您关闭商店。 (我会做一个 PR 来修复代码示例。感谢您指出。)
关于对 Processor#close()
的调用:预计处理器可能会关闭并重新打开。这发生在重新平衡期间。因此,您的代码必须以一种方式编写,以便它可以在多次调用 init()
和 close()
时正常工作——我们最近为此更新了 JavaDocs(改进的 JavaDocs 将成为 Kafka 2.0 版本的一部分).