如何处理 Kafka 流中的不同时区?

How to Handle Different Timezone in Kafka Streams?

所以我正在评估 Kafka Streams 以及它可以做什么,看看它是否适合我的用例,因为我需要每 15 分钟、每小时、每天对传感器数据进行聚合,并发现它很有用,因为它开窗功能。 因为我可以通过在 KGroupedStream 上应用 windowedBy() 来创建 windows 但问题是 windows 是在 UTC 中创建的,我希望我的数据按其原始时区而不是按UTC 时区,因为它阻碍了聚合,所以任何人都可以帮助我。

您可以 "shift" 使用自定义 TimestampExtractor 的时间戳 - 在将结果写回输出主题之前,您可以使用 Transformer 和 "shift"通过 context.forward(key, value, To.all().withTimestamps()).

返回的时间戳

功能请求票:https://issues.apache.org/jira/browse/KAFKA-7911

因此,为了解决这个问题,我创建了自定义 TimestampExtractor 并使用它来更改流 window 创建时间以记录来自有效负载的时间,如下所示。

public class RecordTimeStampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());
        Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());
        return recordTimestamp.getTime();
    }

}

所以现在我已经从昨天开始用我的本地时区测试了它,这是 IST 05:30 并且它的工作正常而且 kafka 流正在根据记录时间戳创建 windows。也会用其他时区进行测试并更新答案