Kafka - TimestampExtractor 的问题
Kafka - problems with TimestampExtractor
我用org.apache.kafka:kafka-streams:0.10.0.1
我正在尝试使用似乎不会触发 KStream.Process()
来触发 ("punctuate") 的基于时间序列的流。 (参考 )
在 KafkaStreams
配置中,我传递了这个参数(以及其他参数):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
这里,EventTimeExtractor
是一个自定义时间戳提取器(实现了 org.apache.kafka.streams.processor.TimestampExtractor
),用于从 JSON 数据中提取时间戳信息。
我希望这会在每条新记录被拉入时调用我的对象(派生自 TimestampExtractor
)。所讨论的流是 2 * 10^6 条记录/分钟。我将 punctuate()
设置为 60 秒,它从不触发。我知道数据非常频繁地通过这个跨度,因为它拉动旧值来赶上。
事实上它根本不会被调用。
- 这是在 KStream 记录上设置时间戳的错误方法吗?
- 这是声明此配置的错误方式吗?
看来你的做法是正确的。比较段落 "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters
不确定,为什么不使用您的自定义时间戳提取器。查看 org.apache.kafka.streams.processor.internals.StreamTask
。在构造函数中应该有类似
的东西
TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);
检查您的自定义提取器是否被提取...
2017 年 11 月更新: Kafka 1.0 中的 Kafka Streams 现在支持 punctuate()
流时间和处理时间(挂钟时间)行为。所以你可以选择你喜欢的行为。
我觉得你的设置是正确的。
您需要注意的事项:从 Kafka 0.10.0 开始,punctuate()
方法在 stream-time 上运行(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间)。而stream-time只有在有新的数据记录进来时才会提前,stream-time提前多少是由这些新记录的关联时间戳决定的。
例如:
- 假设您已将
punctuate()
设置为每 1 分钟调用一次 = 60 * 1000
(注意:stream-time 的 1 分钟)。现在,如果碰巧在接下来的 5 分钟内没有收到任何数据,则 punctuate()
根本不会被调用——即使您可能希望它被调用 5 次。为什么?同样,因为punctuate()
依赖于stream-time,而stream-time只是根据新收到的数据记录提前。
这是否会导致您所看到的行为?
展望未来:Kafka 项目中已经在讨论如何使 punctuate()
更加灵活,例如不仅基于 stream-time
(默认为 event-time
)而且还基于 processing-time
.
触发它
我认为这是经纪人层面的另一个问题。我使用具有更多 CPU 和 RAM 的实例重建了集群。现在我得到了我预期的结果。
远程观察者注意事项:如果您的 KStream 应用程序表现异常,请查看您的代理并确保它们没有卡在 GC 中并且有足够的'headroom' 用于文件句柄、RAM 等
我用org.apache.kafka:kafka-streams:0.10.0.1
我正在尝试使用似乎不会触发 KStream.Process()
来触发 ("punctuate") 的基于时间序列的流。 (参考
在 KafkaStreams
配置中,我传递了这个参数(以及其他参数):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
这里,EventTimeExtractor
是一个自定义时间戳提取器(实现了 org.apache.kafka.streams.processor.TimestampExtractor
),用于从 JSON 数据中提取时间戳信息。
我希望这会在每条新记录被拉入时调用我的对象(派生自 TimestampExtractor
)。所讨论的流是 2 * 10^6 条记录/分钟。我将 punctuate()
设置为 60 秒,它从不触发。我知道数据非常频繁地通过这个跨度,因为它拉动旧值来赶上。
事实上它根本不会被调用。
- 这是在 KStream 记录上设置时间戳的错误方法吗?
- 这是声明此配置的错误方式吗?
看来你的做法是正确的。比较段落 "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters
不确定,为什么不使用您的自定义时间戳提取器。查看 org.apache.kafka.streams.processor.internals.StreamTask
。在构造函数中应该有类似
TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);
检查您的自定义提取器是否被提取...
2017 年 11 月更新: Kafka 1.0 中的 Kafka Streams 现在支持 punctuate()
流时间和处理时间(挂钟时间)行为。所以你可以选择你喜欢的行为。
我觉得你的设置是正确的。
您需要注意的事项:从 Kafka 0.10.0 开始,punctuate()
方法在 stream-time 上运行(默认情况下,即基于默认时间戳提取器,流时间将表示事件时间)。而stream-time只有在有新的数据记录进来时才会提前,stream-time提前多少是由这些新记录的关联时间戳决定的。
例如:
- 假设您已将
punctuate()
设置为每 1 分钟调用一次 =60 * 1000
(注意:stream-time 的 1 分钟)。现在,如果碰巧在接下来的 5 分钟内没有收到任何数据,则punctuate()
根本不会被调用——即使您可能希望它被调用 5 次。为什么?同样,因为punctuate()
依赖于stream-time,而stream-time只是根据新收到的数据记录提前。
这是否会导致您所看到的行为?
展望未来:Kafka 项目中已经在讨论如何使 punctuate()
更加灵活,例如不仅基于 stream-time
(默认为 event-time
)而且还基于 processing-time
.
我认为这是经纪人层面的另一个问题。我使用具有更多 CPU 和 RAM 的实例重建了集群。现在我得到了我预期的结果。
远程观察者注意事项:如果您的 KStream 应用程序表现异常,请查看您的代理并确保它们没有卡在 GC 中并且有足够的'headroom' 用于文件句柄、RAM 等