使用 Kafka Streams 在输出中设置时间戳无法进行转换
Set timestamp in output with Kafka Streams fails for transformations
假设我们有一个转换器(用 Scala 编写)
new Transformer[String, V, (String, V)]() {
var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
override def transform(key: String, value: V): (String, V) = {
val timestamp = toTimestamp(value)
context.forward(key, value, To.all().withTimestamp(timestamp))
key -> value
}
override def close(): Unit = ()
}
其中 toTimestamp
只是一个函数,returns 是从记录值中获取的时间戳。一旦它被执行,就会有一个 NPE:
Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
at CustomTransformer.transform()
at CustomTransformer.transform()
at org.apache.kafka.streams.scala.kstream.KStream$$anon$$anon.transform(KStream.scala:302)
at org.apache.kafka.streams.scala.kstream.KStream$$anon$$anon.transform(KStream.scala:300)
at
本质上发生的是 ProcessorContextImpl
失败于:
public <K, V> void forward(final K key, final V value, final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
}
final ProcessorNode previousNode = currentNode();
因为 recordContext
没有初始化(只能由 KafkaStreams 在内部完成)。
这是后续问题
如果您使用 transformer
,您需要确保在调用 TransformerSupplier#get()
时创建了一个新的 Transformer
对象。 (比照https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)
在最初的问题中,我认为是关于导致 NPE 的 context
变量,但现在我意识到这是关于 Kafka Streams 内部结构的。
Scala API 在 2.0.0 中有一个错误,可能会导致重复使用相同的 Transformer
实例 (https://issues.apache.org/jira/browse/KAFKA-7250)。我 认为 你遇到了这个错误。稍微重写一下代码应该可以解决问题。请注意,Kafka 2.0.1 和 Kafka 2.1.0 包含修复程序。
@matthias-j-sax 如果在 Java 代码中重用处理器,则行为相同。
Topology topology = new Topology();
MyProcessor myProcessor = new MyProcessor();
topology.addSource("source", "topic-1")
.addProcessor(
"processor",
() -> {
return myProcessor;
},
"source"
)
.addSink("sink", "topic-2", "processor");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
假设我们有一个转换器(用 Scala 编写)
new Transformer[String, V, (String, V)]() {
var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
override def transform(key: String, value: V): (String, V) = {
val timestamp = toTimestamp(value)
context.forward(key, value, To.all().withTimestamp(timestamp))
key -> value
}
override def close(): Unit = ()
}
其中 toTimestamp
只是一个函数,returns 是从记录值中获取的时间戳。一旦它被执行,就会有一个 NPE:
Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
at CustomTransformer.transform()
at CustomTransformer.transform()
at org.apache.kafka.streams.scala.kstream.KStream$$anon$$anon.transform(KStream.scala:302)
at org.apache.kafka.streams.scala.kstream.KStream$$anon$$anon.transform(KStream.scala:300)
at
本质上发生的是 ProcessorContextImpl
失败于:
public <K, V> void forward(final K key, final V value, final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
}
final ProcessorNode previousNode = currentNode();
因为 recordContext
没有初始化(只能由 KafkaStreams 在内部完成)。
这是后续问题
如果您使用 transformer
,您需要确保在调用 TransformerSupplier#get()
时创建了一个新的 Transformer
对象。 (比照https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)
在最初的问题中,我认为是关于导致 NPE 的 context
变量,但现在我意识到这是关于 Kafka Streams 内部结构的。
Scala API 在 2.0.0 中有一个错误,可能会导致重复使用相同的 Transformer
实例 (https://issues.apache.org/jira/browse/KAFKA-7250)。我 认为 你遇到了这个错误。稍微重写一下代码应该可以解决问题。请注意,Kafka 2.0.1 和 Kafka 2.1.0 包含修复程序。
@matthias-j-sax 如果在 Java 代码中重用处理器,则行为相同。
Topology topology = new Topology();
MyProcessor myProcessor = new MyProcessor();
topology.addSource("source", "topic-1")
.addProcessor(
"processor",
() -> {
return myProcessor;
},
"source"
)
.addSink("sink", "topic-2", "processor");
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();