使用自定义 TimestampExtractor 的 Kafka 流窗口
Kafka Streams Windowing with Custom TimestampExtractor
我正在尝试创建一个 Kafka Streams 应用程序,我正在尝试计算一段时间内每个平台的唯一设备数 window。
事件Class
public class Event {
private String eventId;
private String deviceId;
private String platform;
private ZonedDateTime createdAt;
}
我需要时间 window 尊重事件的创建时间,所以我写了一个 TimestampExtractor
实现,如下所示:
public class EventTimestampExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
final Event event = (Event) record.value();
final ZonedDateTime eventCreationTime = event.getCreatedAt();
final long timestamp = eventCreationTime.toEpochSecond();
log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);
return timestamp;
}
}
最后,这是我的流媒体应用程序代码:
final KStream<String, Event> eventStream = builder.stream("events_ingestion");
eventStream
.selectKey((key, event) -> {
final String platform = event.getPlatform();
final String deviceId = event.getDeviceId());
return String.join("::", platform, deviceId);
})
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
.count(Materialized.as(COUNT_STORE));
当我将事件推送到 event_ingestion
主题时,我可以看到时间戳已记录到应用程序日志中,并且数据正在写入计数存储中。
当我遍历计数存储时,我看到以下内容:
Key: [ANDROID::1@1539000000/1539900000], Value: 2
虽然我的时间window是15分钟,关键是跨越了10天。如果我从流配置中删除我的 TimestampExtractor 实现(因此返回到处理时间),密钥按预期跨越 15 分钟:
Key: [ANDROID::1@1539256500000/1539257400000], Value: 1
我在这里做错了什么?有什么想法吗?
TimestampExtractor 使用纪元毫秒值进行 windowing。您正在计算 "seconds",这会将消息放入错误的时间 window。
我正在尝试创建一个 Kafka Streams 应用程序,我正在尝试计算一段时间内每个平台的唯一设备数 window。
事件Class
public class Event {
private String eventId;
private String deviceId;
private String platform;
private ZonedDateTime createdAt;
}
我需要时间 window 尊重事件的创建时间,所以我写了一个 TimestampExtractor
实现,如下所示:
public class EventTimestampExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
final Event event = (Event) record.value();
final ZonedDateTime eventCreationTime = event.getCreatedAt();
final long timestamp = eventCreationTime.toEpochSecond();
log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);
return timestamp;
}
}
最后,这是我的流媒体应用程序代码:
final KStream<String, Event> eventStream = builder.stream("events_ingestion");
eventStream
.selectKey((key, event) -> {
final String platform = event.getPlatform();
final String deviceId = event.getDeviceId());
return String.join("::", platform, deviceId);
})
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
.count(Materialized.as(COUNT_STORE));
当我将事件推送到 event_ingestion
主题时,我可以看到时间戳已记录到应用程序日志中,并且数据正在写入计数存储中。
当我遍历计数存储时,我看到以下内容:
Key: [ANDROID::1@1539000000/1539900000], Value: 2
虽然我的时间window是15分钟,关键是跨越了10天。如果我从流配置中删除我的 TimestampExtractor 实现(因此返回到处理时间),密钥按预期跨越 15 分钟:
Key: [ANDROID::1@1539256500000/1539257400000], Value: 1
我在这里做错了什么?有什么想法吗?
TimestampExtractor 使用纪元毫秒值进行 windowing。您正在计算 "seconds",这会将消息放入错误的时间 window。