Kafka 流 - TimeWindows
Kafka Stream - TimeWindows
我有一个关于 TimeWindows with Kafka Streams 的问题,有些概念让我很困惑。
我们有一个主题,每天有 1000 万个事件,我们的日志保留时间为 6 天,因此主题总共包含 6000 万个事件。
实际上我们只对当天的事件感兴趣,其余的我们只保留 5 天用于审计。
现在我从中创建了一个 KTable,我正在加载所有操作并迭代事件。正如我之前提到的,实际上我们只对当天的事件感兴趣,而不是 6000 万个事件,所以我在 KTable 定义中对该数据进行了窗口化。
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
现在,当我使用以下语句加载所有事件时,一切运行正常。
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
当天早些时候的问题是,这将加载 100 万个事件,但稍后会加载 1000 万个事件,所以我必须迭代超过 1000 万个事件,而我们正在以批处理模式工作,我想我可以进一步优化这个并且只加载最后一个小时的事件所以对于相同的 KTable 配置,我尝试使用以下语句。
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但令我惊讶的是,这并没有返回任何数据。
有人可以解释为什么这没有返回任何结果,我想我误解了 TimeWindow 概念中的某些内容。
然后我做了一些进一步的测试,并将我的 KTable 配置更改为以下内容。
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
现在这个查询功能如我所愿
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但我不确定我走的路是否正确...
如果我对最新的 KTable 配置使用以下语句,这会为我提供当天的 1000 万个事件吗?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
当您在 windowed 商店上使用交互式查询时,时间范围将应用于 window 开始时间戳。因此,如果您有一个 1 天的 window,并从 [now - 1 hour, now)
开始查询具有 window 开始时间戳的数据,您将找不到任何匹配的 windows,因为没有 window 在此时间范围内开始。
我有一个关于 TimeWindows with Kafka Streams 的问题,有些概念让我很困惑。
我们有一个主题,每天有 1000 万个事件,我们的日志保留时间为 6 天,因此主题总共包含 6000 万个事件。
实际上我们只对当天的事件感兴趣,其余的我们只保留 5 天用于审计。
现在我从中创建了一个 KTable,我正在加载所有操作并迭代事件。正如我之前提到的,实际上我们只对当天的事件感兴趣,而不是 6000 万个事件,所以我在 KTable 定义中对该数据进行了窗口化。
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
现在,当我使用以下语句加载所有事件时,一切运行正常。
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
当天早些时候的问题是,这将加载 100 万个事件,但稍后会加载 1000 万个事件,所以我必须迭代超过 1000 万个事件,而我们正在以批处理模式工作,我想我可以进一步优化这个并且只加载最后一个小时的事件所以对于相同的 KTable 配置,我尝试使用以下语句。
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但令我惊讶的是,这并没有返回任何数据。
有人可以解释为什么这没有返回任何结果,我想我误解了 TimeWindow 概念中的某些内容。
然后我做了一些进一步的测试,并将我的 KTable 配置更改为以下内容。
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
现在这个查询功能如我所愿
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但我不确定我走的路是否正确...
如果我对最新的 KTable 配置使用以下语句,这会为我提供当天的 1000 万个事件吗?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
当您在 windowed 商店上使用交互式查询时,时间范围将应用于 window 开始时间戳。因此,如果您有一个 1 天的 window,并从 [now - 1 hour, now)
开始查询具有 window 开始时间戳的数据,您将找不到任何匹配的 windows,因为没有 window 在此时间范围内开始。