转换 KStream 窗口聚合的结果

Transforming the results of a KStream windowed aggregation

我有 1 分钟的时间 windows 讨论通过我的主题之一进行的一些活动。我希望获取 window 的结果,并再次对其进行转换,以获得我想要推送到接收器的最终结果。这些是我完成总计的 DSL 调用。

val sessionProcessorStream = builder.stream("collector-prod", Consumed.`with`(Serdes.String, Serdes.String)) //[String,String]
  .filter((_, value) => filterRequest(value))
  .transform(valTransformer,"valTransformState") //[String,String]
  .groupByKey()
  .windowedBy(SessionWindows.`with`(TimeUnit.MINUTES.toMillis(1))) //[Windowed[String],String]
  .aggregate(sessionInitializer,sessionAggregator,sessionMerger,sessionMaterialized) //[Windowed[String],SessionEvent]
  .toStream      
  .transform(sessionTransformer,"sessionTransformState") // [String,Long]

当我尝试 运行 这并在同一个 window 中处理两个事件时,我收到以下错误:

Exception in thread "9-fc11122f-0db3-401b-bdaa-2480eacb8e74-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store session-agg-store7
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
    at org.apache.kafka.streams.processor.internals.StreamTask.run(StreamTask.java:307)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:87)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.NullPointerException
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:857)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3079)
    at SessionEventDeserializer.deserialize(SessionEventDeserializer.scala:17)
    at SessionEventDeserializer.deserialize(SessionEventDeserializer.scala:8)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:158)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:173)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.access[=11=]0(CachingSessionStore.java:38)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.apply(CachingSessionStore.java:88)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
    at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
    at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:165)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    ... 14 more

我不确定异常的确切原因,但是,我确实认为我进入最后一个转换的流的密钥是 Windowed[String]。我认为并期望它是正常的 String.

我试图通过 运行在 toStream

之后添加一个 .map 来解决这个问题
  .map { (_,v) => {
      new KeyValue[String,SessionEvent](v.name,v)
  }}  

但是我遇到了一个我无法弄清楚的编译时错误。

Error:(106, 8) no type parameters for method map: (x: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: SessionEvent, _ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]])org.apache.kafka.streams.kstream.KStream[KR,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],SessionEvent,org.apache.kafka.streams.KeyValue[String,SessionEvent]])
 --- because ---
argument expression's type is not compatible with formal parameter type;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],SessionEvent,org.apache.kafka.streams.KeyValue[String,SessionEvent]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: SessionEvent, _ <: org.apache.kafka.streams.KeyValue[_ <: ?KR, _ <: ?VR]]
      .map { (_,v) => {
Error:(106, 20) type mismatch;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],SessionEvent,org.apache.kafka.streams.KeyValue[String,SessionEvent]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: SessionEvent, _ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]
      .map { (_,v) => {

转换 windowed 聚合结果的正确方法是什么?我将不胜感激任何帮助,并很乐意就任何问题进一步澄清。

错误来自"flush",因此我怀疑它是聚合后的转换器,而不是聚合本身。我假设您 SessionEventDeserializer 中存在错误,因为它会抛出 NullPointerException -- 您是否正确处理了 null?对于这种情况,您的反序列化器必须能够处理 null 和 return null