Kafka - TimestampExtractor 的问题

Kafka - problems with TimestampExtractor

我用org.apache.kafka:kafka-streams:0.10.0.1

我正在尝试使用似乎不会触发 KStream.Process() 来触发 ("punctuate") 的基于时间序列的流。 (参考

KafkaStreams 配置中,我传递了这个参数(以及其他参数):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

这里,EventTimeExtractor 是一个自定义时间戳提取器(实现了 org.apache.kafka.streams.processor.TimestampExtractor),用于从 JSON 数据中提取时间戳信息。

我希望这会在每条新记录被拉入时调用我的对象(派生自 TimestampExtractor)。所讨论的流是 2 * 10^6 条记录/分钟。我将 punctuate() 设置为 60 秒,它从不触发。我知道数据非常频繁地通过这个跨度,因为它拉动旧值来赶上。

事实上它根本不会被调用。

看来你的做法是正确的。比较段落 "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters

不确定,为什么不使用您的自定义时间戳提取器。查看 org.apache.kafka.streams.processor.internals.StreamTask。在构造函数中应该有类似

的东西
TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

检查您的自定义提取器是否被提取...

2017 年 11 月更新: Kafka 1.0 中的 Kafka Streams 现在支持 punctuate() 流时间和处理时间(挂钟时间)行为。所以你可以选择你喜欢的行为。

我觉得你的设置是正确的。

您需要注意的事项:从 Kafka 0.10.0 开始,punctuate() 方法在 stream-time 上运行(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间)。而stream-time只有在有新的数据记录进来时才会提前,stream-time提前多少是由这些新记录的关联时间戳决定的。

例如:

  • 假设您已将 punctuate() 设置为每 1 分钟调用一次 = 60 * 1000(注意:stream-time 的 1 分钟)。现在,如果碰巧在接下来的 5 分钟内没有收到任何数据,则 punctuate() 根本不会被调用——即使您可能希望它被调用 5 次。为什么?同样,因为punctuate()依赖于stream-time,而stream-time只是根据新收到的数据记录提前。

这是否会导致您所看到的行为?

展望未来:Kafka 项目中已经在讨论如何使 punctuate() 更加灵活,例如不仅基于 stream-time(默认为 event-time)而且还基于 processing-time.

触发它

我认为这是经纪人层面的另一个问题。我使用具有更多 CPU 和 RAM 的实例重建了集群。现在我得到了我预期的结果。

远程观察者注意事项:如果您的 KStream 应用程序表现异常,请查看您的代理并确保它们没有卡在 GC 中并且有足够的'headroom' 用于文件句柄、RAM 等