如何在 1 周后处理来自分布式日志代理(例如 Kafka)的日志?
How to process logs from distributed log broker (Eg Kafka) exactly after 1 week?
如果我想处理来自 Kafka 的恰好 1 周前的日志,可以设置什么?
Usecase 是我维护最近 1 周用户的累积统计数据 activity。我对最终一致性很好,不需要正好 1 周的统计数据。
我有一个流式设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的 activity 都应该从统计数据中删除。我可以实现的方法之一是使用批处理(例如 Spark)从统计信息中删除 activity 超过 1 周的时间。
有什么方法可以使用流处理从统计信息中删除 activity 超过 1 周的用户?各种方法的优缺点是什么?
如果我在Kafka中至少使用过一次并且统计数据偏离了ground truth,有什么方法可以定期纠正统计数据?
如果您的 Kafka 消息具有正确的时间戳,那么您可以获得前一周时间戳的偏移量。所以你可以使用..
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
Look up the offsets for the given partitions by timestamp. The
returned offset for each partition is the earliest offset whose
timestamp is greater than or equal to the given timestamp in the
corresponding partition.
要获取主题分区列表,您可以调用 consumer.assignment()
(在 subscribe()
或 assign()
之后),其中 returns Set<TopicPartition>
分配给消费者.地图中的 Long
值基本上是时间戳。因此,对于您案例中的所有键,它将是相同的值(即 1 周前的时间戳)
现在,你已经得到了 Map<TopicPartition, OffsetAndTimestamp>
。您现在可以使用 seek(TopicPartition partition, long offset)
查找每个偏移量。
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
现在,您的用户将位于一周前的消息位置。所以,当你 poll()
,你从上周到现在投票。
您可以更改时间戳以满足您的要求,例如,任何超过 1 周的时间都表示从时间戳 0 到上周时间戳。
所有前一周的数据意味着,2weekOldTimestamp - 1weekOldTimestamp
。
因此,在这种情况下,您必须寻找 2weekOldTimestamp
,然后处理每个分区,直到遇到 1weekOldTimestamp
如果我想处理来自 Kafka 的恰好 1 周前的日志,可以设置什么?
Usecase 是我维护最近 1 周用户的累积统计数据 activity。我对最终一致性很好,不需要正好 1 周的统计数据。
我有一个流式设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的 activity 都应该从统计数据中删除。我可以实现的方法之一是使用批处理(例如 Spark)从统计信息中删除 activity 超过 1 周的时间。
有什么方法可以使用流处理从统计信息中删除 activity 超过 1 周的用户?各种方法的优缺点是什么?
如果我在Kafka中至少使用过一次并且统计数据偏离了ground truth,有什么方法可以定期纠正统计数据?
如果您的 Kafka 消息具有正确的时间戳,那么您可以获得前一周时间戳的偏移量。所以你可以使用..
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
要获取主题分区列表,您可以调用 consumer.assignment()
(在 subscribe()
或 assign()
之后),其中 returns Set<TopicPartition>
分配给消费者.地图中的 Long
值基本上是时间戳。因此,对于您案例中的所有键,它将是相同的值(即 1 周前的时间戳)
现在,你已经得到了 Map<TopicPartition, OffsetAndTimestamp>
。您现在可以使用 seek(TopicPartition partition, long offset)
查找每个偏移量。
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
现在,您的用户将位于一周前的消息位置。所以,当你 poll()
,你从上周到现在投票。
您可以更改时间戳以满足您的要求,例如,任何超过 1 周的时间都表示从时间戳 0 到上周时间戳。
所有前一周的数据意味着,2weekOldTimestamp - 1weekOldTimestamp
。
因此,在这种情况下,您必须寻找 2weekOldTimestamp
,然后处理每个分区,直到遇到 1weekOldTimestamp