Kafka 流:每月时间 windows
Kafka stream: Monthly time windows
基于此示例 (https://github.com/confluentinc/kafka-streams-examples/blob/5.5.0-post/src/test/java/io/confluent/examples/streams/window/DailyTimeWindows.java),我想创建一个每月时间 windows。
问题是 size method 我不知道它的大小,因为每个月都有不同的大小。
有关更多上下文,我想根据 userId 计算一个月内进行交易的每个唯一用户。
windowsFor 方法的实际实现:
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
final Instant instant = Instant.ofEpochMilli(timestamp);
final ZonedDateTime zonedDateTime = instant.atZone(this.zoneId);
final ZonedDateTime startTime = zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
final ZonedDateTime endTime = startTime.plusMonths(1);
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
windows.put(toEpochMilli(startTime), new TimeWindow(toEpochMilli(startTime), toEpochMilli(endTime)));
return windows;
}
有人有想法吗?
The problem is the size method which I don't know the size since every
month have a different size.
您可以将月数转换为天数,然后相加。您还需要注意检查闰年。
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
final Instant instant = Instant.ofEpochMilli(timestamp);
final ZonedDateTime zonedDateTime = instant.atZone(zoneId);
final ZonedDateTime startTime = zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
final ZonedDateTime endTime = startTime.plusDays(getDays(startTime.getYear(), startTime.getMonthValue()));
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
windows.put(toEpochMilli(startTime), new TimeWindow(toEpochMilli(startTime), toEpochMilli(endTime)));
return windows;
}
public static int getDays(int year, int months) {
return YearMonth.of(year, months).lengthOfMonth();
}
遗憾的是,Kafka Streams 目前不支持基于日历的 windows。有一个 ticket 请求它。
主要问题是 Kafka Streams 如何序列化时间 windows。在您链接的示例的测试中,有一个关于此限制的 test with an explanation。
基于此示例 (https://github.com/confluentinc/kafka-streams-examples/blob/5.5.0-post/src/test/java/io/confluent/examples/streams/window/DailyTimeWindows.java),我想创建一个每月时间 windows。 问题是 size method 我不知道它的大小,因为每个月都有不同的大小。
有关更多上下文,我想根据 userId 计算一个月内进行交易的每个唯一用户。
windowsFor 方法的实际实现:
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
final Instant instant = Instant.ofEpochMilli(timestamp);
final ZonedDateTime zonedDateTime = instant.atZone(this.zoneId);
final ZonedDateTime startTime = zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
final ZonedDateTime endTime = startTime.plusMonths(1);
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
windows.put(toEpochMilli(startTime), new TimeWindow(toEpochMilli(startTime), toEpochMilli(endTime)));
return windows;
}
有人有想法吗?
The problem is the size method which I don't know the size since every month have a different size.
您可以将月数转换为天数,然后相加。您还需要注意检查闰年。
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
final Instant instant = Instant.ofEpochMilli(timestamp);
final ZonedDateTime zonedDateTime = instant.atZone(zoneId);
final ZonedDateTime startTime = zonedDateTime.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
final ZonedDateTime endTime = startTime.plusDays(getDays(startTime.getYear(), startTime.getMonthValue()));
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
windows.put(toEpochMilli(startTime), new TimeWindow(toEpochMilli(startTime), toEpochMilli(endTime)));
return windows;
}
public static int getDays(int year, int months) {
return YearMonth.of(year, months).lengthOfMonth();
}
遗憾的是,Kafka Streams 目前不支持基于日历的 windows。有一个 ticket 请求它。
主要问题是 Kafka Streams 如何序列化时间 windows。在您链接的示例的测试中,有一个关于此限制的 test with an explanation。